1 /*	$OpenBSD: rf_dagffwr.c,v 1.5 2002/12/16 07:01:03 tdeval Exp $	*/
2 /*	$NetBSD: rf_dagffwr.c,v 1.5 2000/01/07 03:40:58 oster Exp $	*/
3 
4 /*
5  * Copyright (c) 1995 Carnegie-Mellon University.
6  * All rights reserved.
7  *
8  * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
9  *
10  * Permission to use, copy, modify and distribute this software and
11  * its documentation is hereby granted, provided that both the copyright
12  * notice and this permission notice appear in all copies of the
13  * software, derivative works or modified versions, and any portions
14  * thereof, and that both notices appear in supporting documentation.
15  *
16  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19  *
20  * Carnegie Mellon requests users of this software to return to
21  *
22  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
23  *  School of Computer Science
24  *  Carnegie Mellon University
25  *  Pittsburgh PA 15213-3890
26  *
27  * any improvements or extensions that they make and grant Carnegie the
28  * rights to redistribute these changes.
29  */
30 
31 /*
32  * rf_dagff.c
33  *
34  * Code for creating fault-free DAGs.
35  *
36  */
37 
38 #include "rf_types.h"
39 #include "rf_raid.h"
40 #include "rf_dag.h"
41 #include "rf_dagutils.h"
42 #include "rf_dagfuncs.h"
43 #include "rf_debugMem.h"
44 #include "rf_dagffrd.h"
45 #include "rf_memchunk.h"
46 #include "rf_general.h"
47 #include "rf_dagffwr.h"
48 
49 /*****************************************************************************
50  *
51  * General comments on DAG creation:
52  *
53  * All DAGs in this file use roll-away error recovery. Each DAG has a single
54  * commit node, usually called "Cmt."  If an error occurs before the Cmt node
55  * is reached, the execution engine will halt forward execution and work
56  * backward through the graph, executing the undo functions. Assuming that
57  * each node in the graph prior to the Cmt node are undoable and atomic - or -
58  * does not make changes to permanent state, the graph will fail atomically.
59  * If an error occurs after the Cmt node executes, the engine will roll-forward
60  * through the graph, blindly executing nodes until it reaches the end.
61  * If a graph reaches the end, it is assumed to have completed successfully.
62  *
63  * A graph has only 1 Cmt node.
64  *
65  *****************************************************************************/
66 
67 
68 /*****************************************************************************
69  *
70  * The following wrappers map the standard DAG creation interface to the
71  * DAG creation routines. Additionally, these wrappers enable experimentation
72  * with new DAG structures by providing an extra level of indirection, allowing
73  * the DAG creation routines to be replaced at this single point.
74  *
75  *****************************************************************************/
76 
77 
78 void
rf_CreateNonRedundantWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_IoType_t type)79 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
81     RF_AllocListElem_t *allocList, RF_IoType_t type)
82 {
83 	rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
84 	    RF_IO_TYPE_WRITE);
85 }
86 
87 void
rf_CreateRAID0WriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_IoType_t type)88 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
89     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
90     RF_AllocListElem_t *allocList, RF_IoType_t type)
91 {
92 	rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
93 	    RF_IO_TYPE_WRITE);
94 }
95 
96 void
rf_CreateSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)97 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
98     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
99     RF_AllocListElem_t *allocList)
100 {
101 	/* "normal" rollaway. */
102 	rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
103 	    allocList, &rf_xorFuncs, NULL);
104 }
105 
106 void
rf_CreateLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)107 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
108     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
109     RF_AllocListElem_t *allocList)
110 {
111 	/* "normal" rollaway. */
112 	rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
113 	    allocList, 1, rf_RegularXorFunc, RF_TRUE);
114 }
115 
116 
117 /*****************************************************************************
118  *
119  * DAG creation code begins here.
120  *
121  *****************************************************************************/
122 
123 
124 /*****************************************************************************
125  *
126  * creates a DAG to perform a large-write operation:
127  *
128  *           / Rod \           / Wnd \
129  * H -- block- Rod - Xor - Cmt - Wnd --- T
130  *           \ Rod /          \  Wnp /
131  *                             \[Wnq]/
132  *
133  * The XOR node also does the Q calculation in the P+Q architecture.
134  * All nodes that are before the commit node (Cmt) are assumed to be atomic
135  * and undoable - or - they make no changes to permanent state.
136  *
137  * Rod = read old data
138  * Cmt = commit node
139  * Wnp = write new parity
140  * Wnd = write new data
141  * Wnq = write new "q"
142  * [] denotes optional segments in the graph.
143  *
144  * Parameters:  raidPtr	  - description of the physical array
145  *		asmap	  - logical & physical addresses for this access
146  *		bp	  - buffer ptr (holds write data)
147  *		flags	  - general flags (e.g. disk locking)
148  *		allocList - list of memory allocated in DAG creation
149  *		nfaults	  - number of faults array can tolerate
150  *			    (equal to # redundancy units in stripe)
151  *		redfuncs  - list of redundancy generating functions
152  *
153  *****************************************************************************/
154 
155 void
rf_CommonCreateLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *),int allowBufferRecycle)156 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
157     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
158     RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
159     int allowBufferRecycle)
160 {
161 	RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
162 	RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
163 	int nWndNodes, nRodNodes, i, nodeNum, asmNum;
164 	RF_AccessStripeMapHeader_t *new_asm_h[2];
165 	RF_StripeNum_t parityStripeID;
166 	char *sosBuffer, *eosBuffer;
167 	RF_ReconUnitNum_t which_ru;
168 	RF_RaidLayout_t *layoutPtr;
169 	RF_PhysDiskAddr_t *pda;
170 
171 	layoutPtr = &(raidPtr->Layout);
172 	parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
173 	    asmap->raidAddress, &which_ru);
174 
175 	if (rf_dagDebug) {
176 		printf("[Creating large-write DAG]\n");
177 	}
178 	dag_h->creator = "LargeWriteDAG";
179 
180 	dag_h->numCommitNodes = 1;
181 	dag_h->numCommits = 0;
182 	dag_h->numSuccedents = 1;
183 
184 	/* Alloc the nodes: Wnd, xor, commit, block, term, and  Wnp. */
185 	nWndNodes = asmap->numStripeUnitsAccessed;
186 	RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
187 	    (RF_DagNode_t *), allocList);
188 	i = 0;
189 	wndNodes = &nodes[i];
190 	i += nWndNodes;
191 	xorNode = &nodes[i];
192 	i += 1;
193 	wnpNode = &nodes[i];
194 	i += 1;
195 	blockNode = &nodes[i];
196 	i += 1;
197 	commitNode = &nodes[i];
198 	i += 1;
199 	termNode = &nodes[i];
200 	i += 1;
201 	if (nfaults == 2) {
202 		wnqNode = &nodes[i];
203 		i += 1;
204 	} else {
205 		wnqNode = NULL;
206 	}
207 	rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
208 	    new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
209 	if (nRodNodes > 0) {
210 		RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
211 		    (RF_DagNode_t *), allocList);
212 	} else {
213 		rodNodes = NULL;
214 	}
215 
216 	/* Begin node initialization. */
217 	if (nRodNodes > 0) {
218 		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
219 		    rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
220 		    "Nil", allocList);
221 	} else {
222 		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
223 		    rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
224 		    allocList);
225 	}
226 
227 	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
228 	    rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
229 	    dag_h, "Cmt", allocList);
230 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
231 	    rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
232 	    dag_h, "Trm", allocList);
233 
234 	/* Initialize the Rod nodes. */
235 	for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
236 		if (new_asm_h[asmNum]) {
237 			pda = new_asm_h[asmNum]->stripeMap->physInfo;
238 			while (pda) {
239 				rf_InitNode(&rodNodes[nodeNum], rf_wait,
240 				    RF_FALSE, rf_DiskReadFunc,
241 				    rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
242 				    1, 1, 4, 0, dag_h, "Rod", allocList);
243 				rodNodes[nodeNum].params[0].p = pda;
244 				rodNodes[nodeNum].params[1].p = pda->bufPtr;
245 				rodNodes[nodeNum].params[2].v = parityStripeID;
246 				rodNodes[nodeNum].params[3].v =
247 				    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
248 				    0, 0, which_ru);
249 				nodeNum++;
250 				pda = pda->next;
251 			}
252 		}
253 	}
254 	RF_ASSERT(nodeNum == nRodNodes);
255 
256 	/* Initialize the wnd nodes. */
257 	pda = asmap->physInfo;
258 	for (i = 0; i < nWndNodes; i++) {
259 		rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
260 		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
261 		    dag_h, "Wnd", allocList);
262 		RF_ASSERT(pda != NULL);
263 		wndNodes[i].params[0].p = pda;
264 		wndNodes[i].params[1].p = pda->bufPtr;
265 		wndNodes[i].params[2].v = parityStripeID;
266 		wndNodes[i].params[3].v =
267 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
268 		pda = pda->next;
269 	}
270 
271 	/* Initialize the redundancy node. */
272 	if (nRodNodes > 0) {
273 		rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
274 		    rf_NullNodeUndoFunc, NULL, 1, nRodNodes,
275 		    2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
276 		    "Xr ", allocList);
277 	} else {
278 		rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
279 		    rf_NullNodeUndoFunc, NULL, 1, 1,
280 		    2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
281 		    "Xr ", allocList);
282 	}
283 	xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
284 	for (i = 0; i < nWndNodes; i++) {
285 		xorNode->params[2 * i + 0] =
286 		    wndNodes[i].params[0];	/* pda */
287 		xorNode->params[2 * i + 1] =
288 		    wndNodes[i].params[1];	/* buf ptr */
289 	}
290 	for (i = 0; i < nRodNodes; i++) {
291 		xorNode->params[2 * (nWndNodes + i) + 0] =
292 		    rodNodes[i].params[0];	/* pda */
293 		xorNode->params[2 * (nWndNodes + i) + 1] =
294 		    rodNodes[i].params[1];	/* buf ptr */
295 	}
296 	/* Xor node needs to get at RAID information. */
297 	xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
298 
299 	/*
300 	 * Look for an Rod node that reads a complete SU. If none, alloc
301 	 * a buffer to receive the parity info. Note that we can't use a
302 	 * new data buffer because it will not have gotten written when
303 	 * the xor occurs.
304 	 */
305 	if (allowBufferRecycle) {
306 		for (i = 0; i < nRodNodes; i++) {
307 			if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
308 			    ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
309 				break;
310 		}
311 	}
312 	if ((!allowBufferRecycle) || (i == nRodNodes)) {
313 		RF_CallocAndAdd(xorNode->results[0], 1,
314 		    rf_RaidAddressToByte(raidPtr,
315 		    raidPtr->Layout.sectorsPerStripeUnit),
316 		    (void *), allocList);
317 	} else {
318 		xorNode->results[0] = rodNodes[i].params[1].p;
319 	}
320 
321 	/* Initialize the Wnp node. */
322 	rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
323 	    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
324 	    dag_h, "Wnp", allocList);
325 	wnpNode->params[0].p = asmap->parityInfo;
326 	wnpNode->params[1].p = xorNode->results[0];
327 	wnpNode->params[2].v = parityStripeID;
328 	wnpNode->params[3].v =
329 	    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
330 	/* parityInfo must describe entire parity unit. */
331 	RF_ASSERT(asmap->parityInfo->next == NULL);
332 
333 	if (nfaults == 2) {
334 		/*
335 		 * We never try to recycle a buffer for the Q calculation
336 		 * in addition to the parity. This would cause two buffers
337 		 * to get smashed during the P and Q calculation, guaranteeing
338 		 * one would be wrong.
339 		 */
340 		RF_CallocAndAdd(xorNode->results[1], 1,
341 		    rf_RaidAddressToByte(raidPtr,
342 		     raidPtr->Layout.sectorsPerStripeUnit),
343 		    (void *), allocList);
344 		rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
345 		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
346 		    dag_h, "Wnq", allocList);
347 		wnqNode->params[0].p = asmap->qInfo;
348 		wnqNode->params[1].p = xorNode->results[1];
349 		wnqNode->params[2].v = parityStripeID;
350 		wnqNode->params[3].v =
351 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
352 		/* parityInfo must describe entire parity unit. */
353 		RF_ASSERT(asmap->parityInfo->next == NULL);
354 	}
355 	/*
356 	 * Connect nodes to form graph.
357 	 */
358 
359 	/* Connect dag header to block node. */
360 	RF_ASSERT(blockNode->numAntecedents == 0);
361 	dag_h->succedents[0] = blockNode;
362 
363 	if (nRodNodes > 0) {
364 		/* Connect the block node to the Rod nodes. */
365 		RF_ASSERT(blockNode->numSuccedents == nRodNodes);
366 		RF_ASSERT(xorNode->numAntecedents == nRodNodes);
367 		for (i = 0; i < nRodNodes; i++) {
368 			RF_ASSERT(rodNodes[i].numAntecedents == 1);
369 			blockNode->succedents[i] = &rodNodes[i];
370 			rodNodes[i].antecedents[0] = blockNode;
371 			rodNodes[i].antType[0] = rf_control;
372 
373 			/* Connect the Rod nodes to the Xor node. */
374 			RF_ASSERT(rodNodes[i].numSuccedents == 1);
375 			rodNodes[i].succedents[0] = xorNode;
376 			xorNode->antecedents[i] = &rodNodes[i];
377 			xorNode->antType[i] = rf_trueData;
378 		}
379 	} else {
380 		/* Connect the block node to the Xor node. */
381 		RF_ASSERT(blockNode->numSuccedents == 1);
382 		RF_ASSERT(xorNode->numAntecedents == 1);
383 		blockNode->succedents[0] = xorNode;
384 		xorNode->antecedents[0] = blockNode;
385 		xorNode->antType[0] = rf_control;
386 	}
387 
388 	/* Connect the xor node to the commit node. */
389 	RF_ASSERT(xorNode->numSuccedents == 1);
390 	RF_ASSERT(commitNode->numAntecedents == 1);
391 	xorNode->succedents[0] = commitNode;
392 	commitNode->antecedents[0] = xorNode;
393 	commitNode->antType[0] = rf_control;
394 
395 	/* Connect the commit node to the write nodes. */
396 	RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
397 	for (i = 0; i < nWndNodes; i++) {
398 		RF_ASSERT(wndNodes->numAntecedents == 1);
399 		commitNode->succedents[i] = &wndNodes[i];
400 		wndNodes[i].antecedents[0] = commitNode;
401 		wndNodes[i].antType[0] = rf_control;
402 	}
403 	RF_ASSERT(wnpNode->numAntecedents == 1);
404 	commitNode->succedents[nWndNodes] = wnpNode;
405 	wnpNode->antecedents[0] = commitNode;
406 	wnpNode->antType[0] = rf_trueData;
407 	if (nfaults == 2) {
408 		RF_ASSERT(wnqNode->numAntecedents == 1);
409 		commitNode->succedents[nWndNodes + 1] = wnqNode;
410 		wnqNode->antecedents[0] = commitNode;
411 		wnqNode->antType[0] = rf_trueData;
412 	}
413 	/* Connect the write nodes to the term node. */
414 	RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
415 	RF_ASSERT(termNode->numSuccedents == 0);
416 	for (i = 0; i < nWndNodes; i++) {
417 		RF_ASSERT(wndNodes->numSuccedents == 1);
418 		wndNodes[i].succedents[0] = termNode;
419 		termNode->antecedents[i] = &wndNodes[i];
420 		termNode->antType[i] = rf_control;
421 	}
422 	RF_ASSERT(wnpNode->numSuccedents == 1);
423 	wnpNode->succedents[0] = termNode;
424 	termNode->antecedents[nWndNodes] = wnpNode;
425 	termNode->antType[nWndNodes] = rf_control;
426 	if (nfaults == 2) {
427 		RF_ASSERT(wnqNode->numSuccedents == 1);
428 		wnqNode->succedents[0] = termNode;
429 		termNode->antecedents[nWndNodes + 1] = wnqNode;
430 		termNode->antType[nWndNodes + 1] = rf_control;
431 	}
432 }
433 /*****************************************************************************
434  *
435  * Create a DAG to perform a small-write operation (either raid 5 or pq),
436  * which is as follows:
437  *
438  * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
439  *            \- Rod X      /     \----> Wnd [Und]-/
440  *           [\- Rod X     /       \---> Wnd [Und]-/]
441  *           [\- Roq -> Q /         \--> Wnq [Unq]-/]
442  *
443  * Rop = read old parity
444  * Rod = read old data
445  * Roq = read old "q"
446  * Cmt = commit node
447  * Und = unlock data disk
448  * Unp = unlock parity disk
449  * Unq = unlock q disk
450  * Wnp = write new parity
451  * Wnd = write new data
452  * Wnq = write new "q"
453  * [ ] denotes optional segments in the graph.
454  *
455  * Parameters:	raidPtr	  - description of the physical array
456  *		asmap	  - logical & physical addresses for this access
457  *		bp	  - buffer ptr (holds write data)
458  *		flags	  - general flags (e.g. disk locking)
459  *		allocList - list of memory allocated in DAG creation
460  *		pfuncs	  - list of parity generating functions
461  *		qfuncs	  - list of q generating functions
462  *
463  * A null qfuncs indicates single fault tolerant.
464  *****************************************************************************/
465 
466 void
rf_CommonCreateSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)467 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
468     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
469     RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
470 {
471 	RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
472 	RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
473 	RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
474 	RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
475 	int i, j, nNodes, totalNumNodes, lu_flag;
476 	RF_ReconUnitNum_t which_ru;
477 	int (*func) (RF_DagNode_t *);
478 	int (*undoFunc) (RF_DagNode_t *);
479 	int (*qfunc) (RF_DagNode_t *);
480 	int numDataNodes, numParityNodes;
481 	RF_StripeNum_t parityStripeID;
482 	RF_PhysDiskAddr_t *pda;
483 	char *name, *qname;
484 	long nfaults;
485 
486 	nfaults = qfuncs ? 2 : 1;
487 	lu_flag = (rf_enableAtomicRMW) ? 1 : 0;	/* Lock/unlock flag. */
488 
489 	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
490 	    asmap->raidAddress, &which_ru);
491 	pda = asmap->physInfo;
492 	numDataNodes = asmap->numStripeUnitsAccessed;
493 	numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
494 
495 	if (rf_dagDebug) {
496 		printf("[Creating small-write DAG]\n");
497 	}
498 	RF_ASSERT(numDataNodes > 0);
499 	dag_h->creator = "SmallWriteDAG";
500 
501 	dag_h->numCommitNodes = 1;
502 	dag_h->numCommits = 0;
503 	dag_h->numSuccedents = 1;
504 
505 	/*
506 	 * DAG creation occurs in four steps:
507 	 * 1. Count the number of nodes in the DAG.
508 	 * 2. Create the nodes.
509 	 * 3. Initialize the nodes.
510 	 * 4. Connect the nodes.
511 	 */
512 
513 	/*
514 	 * Step 1. Compute number of nodes in the graph.
515 	 */
516 
517 	/*
518 	 * Number of nodes: a read and write for each data unit, a redundancy
519 	 * computation node for each parity node (nfaults * nparity), a read
520 	 * and write for each parity unit, a block and commit node (2), a
521 	 * terminate node if atomic RMW, an unlock node for each
522 	 * data/redundancy unit.
523 	 */
524 	totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
525 	    + (nfaults * 2 * numParityNodes) + 3;
526 	if (lu_flag) {
527 		totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
528 	}
529 	/*
530 	 * Step 2. Create the nodes.
531 	 */
532 	RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
533 	    (RF_DagNode_t *), allocList);
534 	i = 0;
535 	blockNode = &nodes[i];
536 	i += 1;
537 	commitNode = &nodes[i];
538 	i += 1;
539 	readDataNodes = &nodes[i];
540 	i += numDataNodes;
541 	readParityNodes = &nodes[i];
542 	i += numParityNodes;
543 	writeDataNodes = &nodes[i];
544 	i += numDataNodes;
545 	writeParityNodes = &nodes[i];
546 	i += numParityNodes;
547 	xorNodes = &nodes[i];
548 	i += numParityNodes;
549 	termNode = &nodes[i];
550 	i += 1;
551 	if (lu_flag) {
552 		unlockDataNodes = &nodes[i];
553 		i += numDataNodes;
554 		unlockParityNodes = &nodes[i];
555 		i += numParityNodes;
556 	} else {
557 		unlockDataNodes = unlockParityNodes = NULL;
558 	}
559 	if (nfaults == 2) {
560 		readQNodes = &nodes[i];
561 		i += numParityNodes;
562 		writeQNodes = &nodes[i];
563 		i += numParityNodes;
564 		qNodes = &nodes[i];
565 		i += numParityNodes;
566 		if (lu_flag) {
567 			unlockQNodes = &nodes[i];
568 			i += numParityNodes;
569 		} else {
570 			unlockQNodes = NULL;
571 		}
572 	} else {
573 		readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
574 	}
575 	RF_ASSERT(i == totalNumNodes);
576 
577 	/*
578 	 * Step 3. Initialize the nodes.
579 	 */
580 	/* Initialize block node (Nil). */
581 	nNodes = numDataNodes + (nfaults * numParityNodes);
582 	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
583 	    rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
584 	    "Nil", allocList);
585 
586 	/* Initialize commit node (Cmt). */
587 	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
588 	    rf_NullNodeUndoFunc, NULL, nNodes, (nfaults * numParityNodes),
589 	    0, 0, dag_h, "Cmt", allocList);
590 
591 	/* Initialize terminate node (Trm). */
592 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
593 	    rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
594 	    "Trm", allocList);
595 
596 	/* Initialize nodes which read old data (Rod). */
597 	for (i = 0; i < numDataNodes; i++) {
598 		rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
599 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
600 		    (nfaults * numParityNodes), 1, 4, 0, dag_h, "Rod",
601 		    allocList);
602 		RF_ASSERT(pda != NULL);
603 		/* Physical disk addr desc. */
604 		readDataNodes[i].params[0].p = pda;
605 		/* Buffer to hold old data. */
606 		readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607 		    dag_h, pda, allocList);
608 		readDataNodes[i].params[2].v = parityStripeID;
609 		readDataNodes[i].params[3].v =
610 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
611 		    lu_flag, 0, which_ru);
612 		pda = pda->next;
613 		for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
614 			readDataNodes[i].propList[j] = NULL;
615 		}
616 	}
617 
618 	/* Initialize nodes which read old parity (Rop). */
619 	pda = asmap->parityInfo;
620 	i = 0;
621 	for (i = 0; i < numParityNodes; i++) {
622 		RF_ASSERT(pda != NULL);
623 		rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
624 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
625 		    numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
626 		readParityNodes[i].params[0].p = pda;
627 		/* Buffer to hold old parity. */
628 		readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
629 		    dag_h, pda, allocList);
630 		readParityNodes[i].params[2].v = parityStripeID;
631 		readParityNodes[i].params[3].v =
632 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
633 		    lu_flag, 0, which_ru);
634 		pda = pda->next;
635 		for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
636 			readParityNodes[i].propList[0] = NULL;
637 		}
638 	}
639 
640 	/* Initialize nodes which read old Q (Roq). */
641 	if (nfaults == 2) {
642 		pda = asmap->qInfo;
643 		for (i = 0; i < numParityNodes; i++) {
644 			RF_ASSERT(pda != NULL);
645 			rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
646 			    rf_DiskReadFunc, rf_DiskReadUndoFunc,
647 			    rf_GenericWakeupFunc, numParityNodes,
648 			    1, 4, 0, dag_h, "Roq", allocList);
649 			readQNodes[i].params[0].p = pda;
650 			/* Buffer to hold old Q. */
651 			readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
652 			    dag_h, pda, allocList);
653 			readQNodes[i].params[2].v = parityStripeID;
654 			readQNodes[i].params[3].v =
655 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
656 			    lu_flag, 0, which_ru);
657 			pda = pda->next;
658 			for (j = 0; j < readQNodes[i].numSuccedents; j++) {
659 				readQNodes[i].propList[0] = NULL;
660 			}
661 		}
662 	}
663 	/* Initialize nodes which write new data (Wnd). */
664 	pda = asmap->physInfo;
665 	for (i = 0; i < numDataNodes; i++) {
666 		RF_ASSERT(pda != NULL);
667 		rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
668 		    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
669 		    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
670 		    "Wnd", allocList);
671 		/* Physical disk addr desc. */
672 		writeDataNodes[i].params[0].p = pda;
673 		/* Buffer holding new data to be written. */
674 		writeDataNodes[i].params[1].p = pda->bufPtr;
675 		writeDataNodes[i].params[2].v = parityStripeID;
676 		writeDataNodes[i].params[3].v =
677 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
678 		if (lu_flag) {
679 			/* Initialize node to unlock the disk queue. */
680 			rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
681 			    rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
682 			    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
683 			    "Und", allocList);
684 			/* Physical disk addr desc. */
685 			unlockDataNodes[i].params[0].p = pda;
686 			unlockDataNodes[i].params[1].v =
687 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
688 			    0, lu_flag, which_ru);
689 		}
690 		pda = pda->next;
691 	}
692 
693 	/*
694 	 * Initialize nodes which compute new parity and Q.
695 	 */
696 	/*
697 	 * We use the simple XOR func in the double-XOR case, and when
698 	 * we're accessing only a portion of one stripe unit.
699 	 * The distinction between the two is that the regular XOR func
700 	 * assumes that the targbuf is a full SU in size, and examines
701 	 * the pda associated with the buffer to decide where within
702 	 * the buffer to XOR the data, whereas the simple XOR func just
703 	 * XORs the data into the start of the buffer.
704 	 */
705 	if ((numParityNodes == 2) || ((numDataNodes == 1) &&
706 	    (asmap->totalSectorsAccessed <
707 	     raidPtr->Layout.sectorsPerStripeUnit))) {
708 		func = pfuncs->simple;
709 		undoFunc = rf_NullNodeUndoFunc;
710 		name = pfuncs->SimpleName;
711 		if (qfuncs) {
712 			qfunc = qfuncs->simple;
713 			qname = qfuncs->SimpleName;
714 		} else {
715 			qfunc = NULL;
716 			qname = NULL;
717 		}
718 	} else {
719 		func = pfuncs->regular;
720 		undoFunc = rf_NullNodeUndoFunc;
721 		name = pfuncs->RegularName;
722 		if (qfuncs) {
723 			qfunc = qfuncs->regular;
724 			qname = qfuncs->RegularName;
725 		} else {
726 			qfunc = NULL;
727 			qname = NULL;
728 		}
729 	}
730 	/*
731 	 * Initialize the xor nodes: params are {pda,buf}.
732 	 * From {Rod,Wnd,Rop} nodes, and raidPtr.
733 	 */
734 	if (numParityNodes == 2) {
735 		/* Double-xor case. */
736 		for (i = 0; i < numParityNodes; i++) {
737 			/* Note: no wakeup func for xor. */
738 			rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
739 			    undoFunc, NULL, 1, (numDataNodes + numParityNodes),
740 			    7, 1, dag_h, name, allocList);
741 			xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
742 			xorNodes[i].params[0] = readDataNodes[i].params[0];
743 			xorNodes[i].params[1] = readDataNodes[i].params[1];
744 			xorNodes[i].params[2] = readParityNodes[i].params[0];
745 			xorNodes[i].params[3] = readParityNodes[i].params[1];
746 			xorNodes[i].params[4] = writeDataNodes[i].params[0];
747 			xorNodes[i].params[5] = writeDataNodes[i].params[1];
748 			xorNodes[i].params[6].p = raidPtr;
749 			/* Use old parity buf as target buf. */
750 			xorNodes[i].results[0] = readParityNodes[i].params[1].p;
751 			if (nfaults == 2) {
752 				/* Note: no wakeup func for qor. */
753 				rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
754 				    qfunc, undoFunc, NULL, 1,
755 				    (numDataNodes + numParityNodes), 7, 1,
756 				    dag_h, qname, allocList);
757 				qNodes[i].params[0] =
758 				    readDataNodes[i].params[0];
759 				qNodes[i].params[1] =
760 				    readDataNodes[i].params[1];
761 				qNodes[i].params[2] = readQNodes[i].params[0];
762 				qNodes[i].params[3] = readQNodes[i].params[1];
763 				qNodes[i].params[4] =
764 				    writeDataNodes[i].params[0];
765 				qNodes[i].params[5] =
766 				    writeDataNodes[i].params[1];
767 				qNodes[i].params[6].p = raidPtr;
768 				/* Use old Q buf as target buf. */
769 				qNodes[i].results[0] =
770 				    readQNodes[i].params[1].p;
771 			}
772 		}
773 	} else {
774 		/* There is only one xor node in this case. */
775 		rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
776 		    NULL, 1, (numDataNodes + numParityNodes),
777 		    (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
778 		    dag_h, name, allocList);
779 		xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
780 		for (i = 0; i < numDataNodes + 1; i++) {
781 			/* Set up params related to Rod and Rop nodes. */
782 			xorNodes[0].params[2 * i + 0] =
783 			    readDataNodes[i].params[0];	/* pda */
784 			xorNodes[0].params[2 * i + 1] =
785 			    readDataNodes[i].params[1];	/* buffer ptr */
786 		}
787 		for (i = 0; i < numDataNodes; i++) {
788 			/* Set up params related to Wnd and Wnp nodes. */
789 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
790 			    writeDataNodes[i].params[0];	/* pda */
791 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
792 			    writeDataNodes[i].params[1];	/* buffer ptr */
793 		}
794 		/* Xor node needs to get at RAID information. */
795 		xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
796 		    raidPtr;
797 		xorNodes[0].results[0] = readParityNodes[0].params[1].p;
798 		if (nfaults == 2) {
799 			rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
800 			    undoFunc, NULL, 1, (numDataNodes + numParityNodes),
801 			    (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
802 			    dag_h, qname, allocList);
803 			for (i = 0; i < numDataNodes; i++) {
804 				/* Set up params related to Rod. */
805 				qNodes[0].params[2 * i + 0] =
806 				    readDataNodes[i].params[0];	/* pda */
807 				qNodes[0].params[2 * i + 1] =
808 				    readDataNodes[i].params[1];	/* buffer ptr */
809 			}
810 			/* And read old q. */
811 			qNodes[0].params[2 * numDataNodes + 0] =
812 			    readQNodes[0].params[0];	/* pda */
813 			qNodes[0].params[2 * numDataNodes + 1] =
814 			    readQNodes[0].params[1];	/* buffer ptr */
815 			for (i = 0; i < numDataNodes; i++) {
816 				/* Set up params related to Wnd nodes. */
817 				qNodes[0].params
818 				    [2 * (numDataNodes + 1 + i) + 0] =
819 				    /* pda */
820 				    writeDataNodes[i].params[0];
821 				qNodes[0].params
822 				    [2 * (numDataNodes + 1 + i) + 1] =
823 				    /* buffer ptr */
824 				    writeDataNodes[i].params[1];
825 			}
826 			/* Xor node needs to get at RAID information. */
827 			qNodes[0].params
828 			    [2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
829 			qNodes[0].results[0] = readQNodes[0].params[1].p;
830 		}
831 	}
832 
833 	/* Initialize nodes which write new parity (Wnp). */
834 	pda = asmap->parityInfo;
835 	for (i = 0; i < numParityNodes; i++) {
836 		rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
837 		    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
838 		    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
839 		    "Wnp", allocList);
840 		RF_ASSERT(pda != NULL);
841 		/* Param 1 (bufPtr) filled in by xor node. */
842 		writeParityNodes[i].params[0].p = pda;
843 		/* Buffer pointer for parity write operation. */
844 		writeParityNodes[i].params[1].p = xorNodes[i].results[0];
845 		writeParityNodes[i].params[2].v = parityStripeID;
846 		writeParityNodes[i].params[3].v =
847 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
848 		if (lu_flag) {
849 			/* Initialize node to unlock the disk queue. */
850 			rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
851 			    rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
852 			    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
853 			    "Unp", allocList);
854 			/* Physical disk addr desc. */
855 			unlockParityNodes[i].params[0].p = pda;
856 			unlockParityNodes[i].params[1].v =
857 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
858 			    0, lu_flag, which_ru);
859 		}
860 		pda = pda->next;
861 	}
862 
863 	/* Initialize nodes which write new Q (Wnq). */
864 	if (nfaults == 2) {
865 		pda = asmap->qInfo;
866 		for (i = 0; i < numParityNodes; i++) {
867 			rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
868 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
869 			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
870 			    "Wnq", allocList);
871 			RF_ASSERT(pda != NULL);
872 			/* Param 1 (bufPtr) filled in by xor node. */
873 			writeQNodes[i].params[0].p = pda;
874 			writeQNodes[i].params[1].p = qNodes[i].results[0];
875 			/* Buffer pointer for parity write operation. */
876 			writeQNodes[i].params[2].v = parityStripeID;
877 			writeQNodes[i].params[3].v =
878 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
879 			    0, 0, which_ru);
880 			if (lu_flag) {
881 				/* Initialize node to unlock the disk queue. */
882 				rf_InitNode(&unlockQNodes[i], rf_wait,
883 				    RF_FALSE, rf_DiskUnlockFunc,
884 				    rf_DiskUnlockUndoFunc,
885 				    rf_GenericWakeupFunc, 1, 1, 2, 0,
886 				    dag_h, "Unq", allocList);
887 				/* Physical disk addr desc. */
888 				unlockQNodes[i].params[0].p = pda;
889 				unlockQNodes[i].params[1].v =
890 				    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
891 				    0, lu_flag, which_ru);
892 			}
893 			pda = pda->next;
894 		}
895 	}
896 	/*
897 	 * Step 4. Connect the nodes.
898 	 */
899 
900 	/* Connect header to block node. */
901 	dag_h->succedents[0] = blockNode;
902 
903 	/* Connect block node to read old data nodes. */
904 	RF_ASSERT(blockNode->numSuccedents ==
905 	    (numDataNodes + (numParityNodes * nfaults)));
906 	for (i = 0; i < numDataNodes; i++) {
907 		blockNode->succedents[i] = &readDataNodes[i];
908 		RF_ASSERT(readDataNodes[i].numAntecedents == 1);
909 		readDataNodes[i].antecedents[0] = blockNode;
910 		readDataNodes[i].antType[0] = rf_control;
911 	}
912 
913 	/* Connect block node to read old parity nodes. */
914 	for (i = 0; i < numParityNodes; i++) {
915 		blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
916 		RF_ASSERT(readParityNodes[i].numAntecedents == 1);
917 		readParityNodes[i].antecedents[0] = blockNode;
918 		readParityNodes[i].antType[0] = rf_control;
919 	}
920 
921 	/* Connect block node to read old Q nodes. */
922 	if (nfaults == 2) {
923 		for (i = 0; i < numParityNodes; i++) {
924 			blockNode->succedents[numDataNodes + numParityNodes + i]
925 			    = &readQNodes[i];
926 			RF_ASSERT(readQNodes[i].numAntecedents == 1);
927 			readQNodes[i].antecedents[0] = blockNode;
928 			readQNodes[i].antType[0] = rf_control;
929 		}
930 	}
931 	/* Connect read old data nodes to xor nodes. */
932 	for (i = 0; i < numDataNodes; i++) {
933 		RF_ASSERT(readDataNodes[i].numSuccedents ==
934 		    (nfaults * numParityNodes));
935 		for (j = 0; j < numParityNodes; j++) {
936 			RF_ASSERT(xorNodes[j].numAntecedents ==
937 			    numDataNodes + numParityNodes);
938 			readDataNodes[i].succedents[j] = &xorNodes[j];
939 			xorNodes[j].antecedents[i] = &readDataNodes[i];
940 			xorNodes[j].antType[i] = rf_trueData;
941 		}
942 	}
943 
944 	/* Connect read old data nodes to q nodes. */
945 	if (nfaults == 2) {
946 		for (i = 0; i < numDataNodes; i++) {
947 			for (j = 0; j < numParityNodes; j++) {
948 				RF_ASSERT(qNodes[j].numAntecedents ==
949 				    numDataNodes + numParityNodes);
950 				readDataNodes[i].succedents[numParityNodes + j]
951 				    = &qNodes[j];
952 				qNodes[j].antecedents[i] = &readDataNodes[i];
953 				qNodes[j].antType[i] = rf_trueData;
954 			}
955 		}
956 	}
957 	/* Connect read old parity nodes to xor nodes. */
958 	for (i = 0; i < numParityNodes; i++) {
959 		RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
960 		for (j = 0; j < numParityNodes; j++) {
961 			readParityNodes[i].succedents[j] = &xorNodes[j];
962 			xorNodes[j].antecedents[numDataNodes + i] =
963 			    &readParityNodes[i];
964 			xorNodes[j].antType[numDataNodes + i] = rf_trueData;
965 		}
966 	}
967 
968 	/* Connect read old q nodes to q nodes. */
969 	if (nfaults == 2) {
970 		for (i = 0; i < numParityNodes; i++) {
971 			RF_ASSERT(readParityNodes[i].numSuccedents ==
972 			    numParityNodes);
973 			for (j = 0; j < numParityNodes; j++) {
974 				readQNodes[i].succedents[j] = &qNodes[j];
975 				qNodes[j].antecedents[numDataNodes + i] =
976 				    &readQNodes[i];
977 				qNodes[j].antType[numDataNodes + i] =
978 				    rf_trueData;
979 			}
980 		}
981 	}
982 	/* Connect xor nodes to commit node. */
983 	RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
984 	for (i = 0; i < numParityNodes; i++) {
985 		RF_ASSERT(xorNodes[i].numSuccedents == 1);
986 		xorNodes[i].succedents[0] = commitNode;
987 		commitNode->antecedents[i] = &xorNodes[i];
988 		commitNode->antType[i] = rf_control;
989 	}
990 
991 	/* Connect q nodes to commit node. */
992 	if (nfaults == 2) {
993 		for (i = 0; i < numParityNodes; i++) {
994 			RF_ASSERT(qNodes[i].numSuccedents == 1);
995 			qNodes[i].succedents[0] = commitNode;
996 			commitNode->antecedents[i + numParityNodes] =
997 			    &qNodes[i];
998 			commitNode->antType[i + numParityNodes] = rf_control;
999 		}
1000 	}
1001 	/* Connect commit node to write nodes. */
1002 	RF_ASSERT(commitNode->numSuccedents ==
1003 	    (numDataNodes + (nfaults * numParityNodes)));
1004 	for (i = 0; i < numDataNodes; i++) {
1005 		RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1006 		commitNode->succedents[i] = &writeDataNodes[i];
1007 		writeDataNodes[i].antecedents[0] = commitNode;
1008 		writeDataNodes[i].antType[0] = rf_trueData;
1009 	}
1010 	for (i = 0; i < numParityNodes; i++) {
1011 		RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1012 		commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1013 		writeParityNodes[i].antecedents[0] = commitNode;
1014 		writeParityNodes[i].antType[0] = rf_trueData;
1015 	}
1016 	if (nfaults == 2) {
1017 		for (i = 0; i < numParityNodes; i++) {
1018 			RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1019 			commitNode->succedents
1020 			    [i + numDataNodes + numParityNodes] =
1021 			    &writeQNodes[i];
1022 			writeQNodes[i].antecedents[0] = commitNode;
1023 			writeQNodes[i].antType[0] = rf_trueData;
1024 		}
1025 	}
1026 	RF_ASSERT(termNode->numAntecedents ==
1027 	    (numDataNodes + (nfaults * numParityNodes)));
1028 	RF_ASSERT(termNode->numSuccedents == 0);
1029 	for (i = 0; i < numDataNodes; i++) {
1030 		if (lu_flag) {
1031 			/* Connect write new data nodes to unlock nodes. */
1032 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1033 			RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1034 			writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1035 			unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1036 			unlockDataNodes[i].antType[0] = rf_control;
1037 
1038 			/* Connect unlock nodes to term node. */
1039 			RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1040 			unlockDataNodes[i].succedents[0] = termNode;
1041 			termNode->antecedents[i] = &unlockDataNodes[i];
1042 			termNode->antType[i] = rf_control;
1043 		} else {
1044 			/* Connect write new data nodes to term node. */
1045 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1046 			RF_ASSERT(termNode->numAntecedents ==
1047 			    (numDataNodes + (nfaults * numParityNodes)));
1048 			writeDataNodes[i].succedents[0] = termNode;
1049 			termNode->antecedents[i] = &writeDataNodes[i];
1050 			termNode->antType[i] = rf_control;
1051 		}
1052 	}
1053 
1054 	for (i = 0; i < numParityNodes; i++) {
1055 		if (lu_flag) {
1056 			/* Connect write new parity nodes to unlock nodes. */
1057 			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1058 			RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1059 			writeParityNodes[i].succedents[0] =
1060 			    &unlockParityNodes[i];
1061 			unlockParityNodes[i].antecedents[0] =
1062 			    &writeParityNodes[i];
1063 			unlockParityNodes[i].antType[0] = rf_control;
1064 
1065 			/* Connect unlock nodes to term node. */
1066 			RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1067 			unlockParityNodes[i].succedents[0] = termNode;
1068 			termNode->antecedents[numDataNodes + i] =
1069 			    &unlockParityNodes[i];
1070 			termNode->antType[numDataNodes + i] = rf_control;
1071 		} else {
1072 			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1073 			writeParityNodes[i].succedents[0] = termNode;
1074 			termNode->antecedents[numDataNodes + i] =
1075 			    &writeParityNodes[i];
1076 			termNode->antType[numDataNodes + i] = rf_control;
1077 		}
1078 	}
1079 
1080 	if (nfaults == 2) {
1081 		for (i = 0; i < numParityNodes; i++) {
1082 			if (lu_flag) {
1083 				/* Connect write new Q nodes to unlock nodes. */
1084 				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1085 				RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1086 				writeQNodes[i].succedents[0] = &unlockQNodes[i];
1087 				unlockQNodes[i].antecedents[0] =
1088 				    &writeQNodes[i];
1089 				unlockQNodes[i].antType[0] = rf_control;
1090 
1091 				/* Connect unlock nodes to unblock node. */
1092 				RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1093 				unlockQNodes[i].succedents[0] = termNode;
1094 				termNode->antecedents
1095 				    [numDataNodes + numParityNodes + i] =
1096 				    &unlockQNodes[i];
1097 				termNode->antType
1098 				    [numDataNodes + numParityNodes + i] =
1099 				    rf_control;
1100 			} else {
1101 				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1102 				writeQNodes[i].succedents[0] = termNode;
1103 				termNode->antecedents
1104 				    [numDataNodes + numParityNodes + i] =
1105 				    &writeQNodes[i];
1106 				termNode->antType
1107 				    [numDataNodes + numParityNodes + i] =
1108 				    rf_control;
1109 			}
1110 		}
1111 	}
1112 }
1113 
1114 
1115 /*****************************************************************************
1116  * Create a write graph (fault-free or degraded) for RAID level 1.
1117  *
1118  * Hdr -> Commit -> Wpd -> Nil -> Trm
1119  *		 -> Wsd ->
1120  *
1121  * The "Wpd" node writes data to the primary copy in the mirror pair.
1122  * The "Wsd" node writes data to the secondary copy in the mirror pair.
1123  *
1124  * Parameters:	raidPtr	  - description of the physical array
1125  *		asmap	  - logical & physical addresses for this access
1126  *		bp	  - buffer ptr (holds write data)
1127  *		flags	  - general flags (e.g. disk locking)
1128  *		allocList - list of memory allocated in DAG creation
1129  *****************************************************************************/
1130 
1131 void
rf_CreateRaidOneWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)1132 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1133     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1134     RF_AllocListElem_t *allocList)
1135 {
1136 	RF_DagNode_t *unblockNode, *termNode, *commitNode;
1137 	RF_DagNode_t *nodes, *wndNode, *wmirNode;
1138 	int nWndNodes, nWmirNodes, i;
1139 	RF_ReconUnitNum_t which_ru;
1140 	RF_PhysDiskAddr_t *pda, *pdaP;
1141 	RF_StripeNum_t parityStripeID;
1142 
1143 	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1144 	    asmap->raidAddress, &which_ru);
1145 	if (rf_dagDebug) {
1146 		printf("[Creating RAID level 1 write DAG]\n");
1147 	}
1148 	dag_h->creator = "RaidOneWriteDAG";
1149 
1150 	/* 2 implies access not SU aligned. */
1151 	nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1152 	nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1153 
1154 	/* Alloc the Wnd nodes and the Wmir node. */
1155 	if (asmap->numDataFailed == 1)
1156 		nWndNodes--;
1157 	if (asmap->numParityFailed == 1)
1158 		nWmirNodes--;
1159 
1160 	/*
1161 	 * Total number of nodes = nWndNodes + nWmirNodes
1162 	 * + (commit + unblock + terminator)
1163 	 */
1164 	RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1165 	    (RF_DagNode_t *), allocList);
1166 	i = 0;
1167 	wndNode = &nodes[i];
1168 	i += nWndNodes;
1169 	wmirNode = &nodes[i];
1170 	i += nWmirNodes;
1171 	commitNode = &nodes[i];
1172 	i += 1;
1173 	unblockNode = &nodes[i];
1174 	i += 1;
1175 	termNode = &nodes[i];
1176 	i += 1;
1177 	RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1178 
1179 	/* This dag can commit immediately. */
1180 	dag_h->numCommitNodes = 1;
1181 	dag_h->numCommits = 0;
1182 	dag_h->numSuccedents = 1;
1183 
1184 	/* Initialize the commit, unblock, and term nodes. */
1185 	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1186 	    rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0,
1187 	    dag_h, "Cmt", allocList);
1188 	rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1189 	    rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0,
1190 	    dag_h, "Nil", allocList);
1191 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1192 	    rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1193 
1194 	/* Initialize the wnd nodes. */
1195 	if (nWndNodes > 0) {
1196 		pda = asmap->physInfo;
1197 		for (i = 0; i < nWndNodes; i++) {
1198 			rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1199 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1200 			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1201 			    "Wpd", allocList);
1202 			RF_ASSERT(pda != NULL);
1203 			wndNode[i].params[0].p = pda;
1204 			wndNode[i].params[1].p = pda->bufPtr;
1205 			wndNode[i].params[2].v = parityStripeID;
1206 			wndNode[i].params[3].v =
1207 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1208 			    0, 0, which_ru);
1209 			pda = pda->next;
1210 		}
1211 		RF_ASSERT(pda == NULL);
1212 	}
1213 	/* Initialize the mirror nodes. */
1214 	if (nWmirNodes > 0) {
1215 		pda = asmap->physInfo;
1216 		pdaP = asmap->parityInfo;
1217 		for (i = 0; i < nWmirNodes; i++) {
1218 			rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1219 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1220 			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1221 			    "Wsd", allocList);
1222 			RF_ASSERT(pda != NULL);
1223 			wmirNode[i].params[0].p = pdaP;
1224 			wmirNode[i].params[1].p = pda->bufPtr;
1225 			wmirNode[i].params[2].v = parityStripeID;
1226 			wmirNode[i].params[3].v =
1227 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1228 			    0, 0, which_ru);
1229 			pda = pda->next;
1230 			pdaP = pdaP->next;
1231 		}
1232 		RF_ASSERT(pda == NULL);
1233 		RF_ASSERT(pdaP == NULL);
1234 	}
1235 	/* Link the header node to the commit node. */
1236 	RF_ASSERT(dag_h->numSuccedents == 1);
1237 	RF_ASSERT(commitNode->numAntecedents == 0);
1238 	dag_h->succedents[0] = commitNode;
1239 
1240 	/* Link the commit node to the write nodes. */
1241 	RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1242 	for (i = 0; i < nWndNodes; i++) {
1243 		RF_ASSERT(wndNode[i].numAntecedents == 1);
1244 		commitNode->succedents[i] = &wndNode[i];
1245 		wndNode[i].antecedents[0] = commitNode;
1246 		wndNode[i].antType[0] = rf_control;
1247 	}
1248 	for (i = 0; i < nWmirNodes; i++) {
1249 		RF_ASSERT(wmirNode[i].numAntecedents == 1);
1250 		commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1251 		wmirNode[i].antecedents[0] = commitNode;
1252 		wmirNode[i].antType[0] = rf_control;
1253 	}
1254 
1255 	/* Link the write nodes to the unblock node. */
1256 	RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1257 	for (i = 0; i < nWndNodes; i++) {
1258 		RF_ASSERT(wndNode[i].numSuccedents == 1);
1259 		wndNode[i].succedents[0] = unblockNode;
1260 		unblockNode->antecedents[i] = &wndNode[i];
1261 		unblockNode->antType[i] = rf_control;
1262 	}
1263 	for (i = 0; i < nWmirNodes; i++) {
1264 		RF_ASSERT(wmirNode[i].numSuccedents == 1);
1265 		wmirNode[i].succedents[0] = unblockNode;
1266 		unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1267 		unblockNode->antType[i + nWndNodes] = rf_control;
1268 	}
1269 
1270 	/* Link the unblock node to the term node. */
1271 	RF_ASSERT(unblockNode->numSuccedents == 1);
1272 	RF_ASSERT(termNode->numAntecedents == 1);
1273 	RF_ASSERT(termNode->numSuccedents == 0);
1274 	unblockNode->succedents[0] = termNode;
1275 	termNode->antecedents[0] = unblockNode;
1276 	termNode->antType[0] = rf_control;
1277 }
1278 
1279 
1280 
1281 /*
1282  * DAGs that have no commit points.
1283  *
1284  * The following DAGs are used in forward and backward error recovery
1285  * experiments.
1286  * They are identical to the DAGs above this comment with the exception that
1287  * the commit points have been removed.
1288  */
1289 
1290 
1291 void
rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *),int allowBufferRecycle)1292 rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1293     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1294     RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
1295     int allowBufferRecycle)
1296 {
1297 	RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1298 	RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1299 	int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1300 	RF_AccessStripeMapHeader_t *new_asm_h[2];
1301 	RF_StripeNum_t parityStripeID;
1302 	char *sosBuffer, *eosBuffer;
1303 	RF_ReconUnitNum_t which_ru;
1304 	RF_RaidLayout_t *layoutPtr;
1305 	RF_PhysDiskAddr_t *pda;
1306 
1307 	layoutPtr = &(raidPtr->Layout);
1308 	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1309 	    asmap->raidAddress, &which_ru);
1310 
1311 	if (rf_dagDebug)
1312 		printf("[Creating large-write DAG]\n");
1313 	dag_h->creator = "LargeWriteDAGFwd";
1314 
1315 	dag_h->numCommitNodes = 0;
1316 	dag_h->numCommits = 0;
1317 	dag_h->numSuccedents = 1;
1318 
1319 	/* Alloc the nodes: Wnd, xor, commit, block, term, and  Wnp. */
1320 	nWndNodes = asmap->numStripeUnitsAccessed;
1321 	RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
1322 	    (RF_DagNode_t *), allocList);
1323 	i = 0;
1324 	wndNodes = &nodes[i];
1325 	i += nWndNodes;
1326 	xorNode = &nodes[i];
1327 	i += 1;
1328 	wnpNode = &nodes[i];
1329 	i += 1;
1330 	blockNode = &nodes[i];
1331 	i += 1;
1332 	syncNode = &nodes[i];
1333 	i += 1;
1334 	termNode = &nodes[i];
1335 	i += 1;
1336 	if (nfaults == 2) {
1337 		wnqNode = &nodes[i];
1338 		i += 1;
1339 	} else {
1340 		wnqNode = NULL;
1341 	}
1342 	rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
1343 	    new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1344 	if (nRodNodes > 0) {
1345 		RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
1346 		    (RF_DagNode_t *), allocList);
1347 	} else {
1348 		rodNodes = NULL;
1349 	}
1350 
1351 	/* Begin node initialization. */
1352 	if (nRodNodes > 0) {
1353 		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1354 		    rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
1355 		    "Nil", allocList);
1356 		rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1357 		    rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0,
1358 		    dag_h, "Nil", allocList);
1359 	} else {
1360 		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1361 		    rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
1362 		    allocList);
1363 		rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1364 		    rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h,
1365 		    "Nil", allocList);
1366 	}
1367 
1368 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1369 	    rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
1370 	    dag_h, "Trm", allocList);
1371 
1372 	/* Initialize the Rod nodes. */
1373 	for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1374 		if (new_asm_h[asmNum]) {
1375 			pda = new_asm_h[asmNum]->stripeMap->physInfo;
1376 			while (pda) {
1377 				rf_InitNode(&rodNodes[nodeNum], rf_wait,
1378 				    RF_FALSE, rf_DiskReadFunc,
1379 				    rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1380 				    1, 1, 4, 0, dag_h, "Rod", allocList);
1381 				rodNodes[nodeNum].params[0].p = pda;
1382 				rodNodes[nodeNum].params[1].p = pda->bufPtr;
1383 				rodNodes[nodeNum].params[2].v = parityStripeID;
1384 				rodNodes[nodeNum].params[3].v =
1385 				    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1386 				    0, 0, which_ru);
1387 				nodeNum++;
1388 				pda = pda->next;
1389 			}
1390 		}
1391 	}
1392 	RF_ASSERT(nodeNum == nRodNodes);
1393 
1394 	/* Initialize the wnd nodes. */
1395 	pda = asmap->physInfo;
1396 	for (i = 0; i < nWndNodes; i++) {
1397 		rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
1398 		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1399 		    dag_h, "Wnd", allocList);
1400 		RF_ASSERT(pda != NULL);
1401 		wndNodes[i].params[0].p = pda;
1402 		wndNodes[i].params[1].p = pda->bufPtr;
1403 		wndNodes[i].params[2].v = parityStripeID;
1404 		wndNodes[i].params[3].v =
1405 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1406 		pda = pda->next;
1407 	}
1408 
1409 	/* Initialize the redundancy node. */
1410 	rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc,
1411 	    NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
1412 	    "Xr ", allocList);
1413 	xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1414 	for (i = 0; i < nWndNodes; i++) {
1415 		xorNode->params[2 * i + 0] =
1416 		    wndNodes[i].params[0];	/* pda */
1417 		xorNode->params[2 * i + 1] =
1418 		    wndNodes[i].params[1];	/* buf ptr */
1419 	}
1420 	for (i = 0; i < nRodNodes; i++) {
1421 		xorNode->params[2 * (nWndNodes + i) + 0] =
1422 		    rodNodes[i].params[0];	/* pda */
1423 		xorNode->params[2 * (nWndNodes + i) + 1] =
1424 		    rodNodes[i].params[1];	/* buf ptr */
1425 	}
1426 	/* Xor node needs to get at RAID information. */
1427 	xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
1428 
1429 	/*
1430 	 * Look for an Rod node that reads a complete SU. If none, alloc a
1431 	 * buffer to receive the parity info. Note that we can't use a new
1432 	 * data buffer because it will not have gotten written when the xor
1433 	 * occurs.
1434 	 */
1435 	if (allowBufferRecycle) {
1436 		for (i = 0; i < nRodNodes; i++)
1437 			if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
1438 			    ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1439 				break;
1440 	}
1441 	if ((!allowBufferRecycle) || (i == nRodNodes)) {
1442 		RF_CallocAndAdd(xorNode->results[0], 1,
1443 		    rf_RaidAddressToByte(raidPtr,
1444 		    raidPtr->Layout.sectorsPerStripeUnit),
1445 		    (void *), allocList);
1446 	} else
1447 		xorNode->results[0] = rodNodes[i].params[1].p;
1448 
1449 	/* Initialize the Wnp node. */
1450 	rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1451 	    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1452 	    dag_h, "Wnp", allocList);
1453 	wnpNode->params[0].p = asmap->parityInfo;
1454 	wnpNode->params[1].p = xorNode->results[0];
1455 	wnpNode->params[2].v = parityStripeID;
1456 	wnpNode->params[3].v =
1457 	    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1458 	/* parityInfo must describe entire parity unit. */
1459 	RF_ASSERT(asmap->parityInfo->next == NULL);
1460 
1461 	if (nfaults == 2) {
1462 		/*
1463 		 * Never try to recycle a buffer for the Q calcuation in
1464 		 * addition to the parity. This would cause two buffers to
1465 		 * get smashed during the P and Q calculation, guaranteeing
1466 		 * one would be wrong.
1467 		 */
1468 		RF_CallocAndAdd(xorNode->results[1], 1,
1469 		    rf_RaidAddressToByte(raidPtr,
1470 		    raidPtr->Layout.sectorsPerStripeUnit),
1471 		    (void *), allocList);
1472 		rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1473 		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1474 		    dag_h, "Wnq", allocList);
1475 		wnqNode->params[0].p = asmap->qInfo;
1476 		wnqNode->params[1].p = xorNode->results[1];
1477 		wnqNode->params[2].v = parityStripeID;
1478 		wnqNode->params[3].v =
1479 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1480 		/* parityInfo must describe entire parity unit. */
1481 		RF_ASSERT(asmap->parityInfo->next == NULL);
1482 	}
1483 
1484 	/* Connect nodes to form graph. */
1485 
1486 	/* Connect dag header to block node. */
1487 	RF_ASSERT(blockNode->numAntecedents == 0);
1488 	dag_h->succedents[0] = blockNode;
1489 
1490 	if (nRodNodes > 0) {
1491 		/* Connect the block node to the Rod nodes. */
1492 		RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1493 		RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1494 		for (i = 0; i < nRodNodes; i++) {
1495 			RF_ASSERT(rodNodes[i].numAntecedents == 1);
1496 			blockNode->succedents[i] = &rodNodes[i];
1497 			rodNodes[i].antecedents[0] = blockNode;
1498 			rodNodes[i].antType[0] = rf_control;
1499 
1500 			/* Connect the Rod nodes to the Nil node. */
1501 			RF_ASSERT(rodNodes[i].numSuccedents == 1);
1502 			rodNodes[i].succedents[0] = syncNode;
1503 			syncNode->antecedents[i] = &rodNodes[i];
1504 			syncNode->antType[i] = rf_trueData;
1505 		}
1506 	} else {
1507 		/* Connect the block node to the Nil node. */
1508 		RF_ASSERT(blockNode->numSuccedents == 1);
1509 		RF_ASSERT(syncNode->numAntecedents == 1);
1510 		blockNode->succedents[0] = syncNode;
1511 		syncNode->antecedents[0] = blockNode;
1512 		syncNode->antType[0] = rf_control;
1513 	}
1514 
1515 	/* Connect the sync node to the Wnd nodes. */
1516 	RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1517 	for (i = 0; i < nWndNodes; i++) {
1518 		RF_ASSERT(wndNodes->numAntecedents == 1);
1519 		syncNode->succedents[i] = &wndNodes[i];
1520 		wndNodes[i].antecedents[0] = syncNode;
1521 		wndNodes[i].antType[0] = rf_control;
1522 	}
1523 
1524 	/* Connect the sync node to the Xor node. */
1525 	RF_ASSERT(xorNode->numAntecedents == 1);
1526 	syncNode->succedents[nWndNodes] = xorNode;
1527 	xorNode->antecedents[0] = syncNode;
1528 	xorNode->antType[0] = rf_control;
1529 
1530 	/* Connect the xor node to the write parity node. */
1531 	RF_ASSERT(xorNode->numSuccedents == nfaults);
1532 	RF_ASSERT(wnpNode->numAntecedents == 1);
1533 	xorNode->succedents[0] = wnpNode;
1534 	wnpNode->antecedents[0] = xorNode;
1535 	wnpNode->antType[0] = rf_trueData;
1536 	if (nfaults == 2) {
1537 		RF_ASSERT(wnqNode->numAntecedents == 1);
1538 		xorNode->succedents[1] = wnqNode;
1539 		wnqNode->antecedents[0] = xorNode;
1540 		wnqNode->antType[0] = rf_trueData;
1541 	}
1542 	/* Connect the write nodes to the term node. */
1543 	RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1544 	RF_ASSERT(termNode->numSuccedents == 0);
1545 	for (i = 0; i < nWndNodes; i++) {
1546 		RF_ASSERT(wndNodes->numSuccedents == 1);
1547 		wndNodes[i].succedents[0] = termNode;
1548 		termNode->antecedents[i] = &wndNodes[i];
1549 		termNode->antType[i] = rf_control;
1550 	}
1551 	RF_ASSERT(wnpNode->numSuccedents == 1);
1552 	wnpNode->succedents[0] = termNode;
1553 	termNode->antecedents[nWndNodes] = wnpNode;
1554 	termNode->antType[nWndNodes] = rf_control;
1555 	if (nfaults == 2) {
1556 		RF_ASSERT(wnqNode->numSuccedents == 1);
1557 		wnqNode->succedents[0] = termNode;
1558 		termNode->antecedents[nWndNodes + 1] = wnqNode;
1559 		termNode->antType[nWndNodes + 1] = rf_control;
1560 	}
1561 }
1562 
1563 
1564 /*****************************************************************************
1565  *
1566  * Create a DAG to perform a small-write operation (either raid 5 or pq),
1567  * which is as follows:
1568  *
1569  * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1570  *            \- Rod X- Wnd [Und] -------/
1571  *           [\- Rod X- Wnd [Und] ------/]
1572  *           [\- Roq - Q --> Wnq [Unq]-/]
1573  *
1574  * Rop = read old parity
1575  * Rod = read old data
1576  * Roq = read old "q"
1577  * Cmt = commit node
1578  * Und = unlock data disk
1579  * Unp = unlock parity disk
1580  * Unq = unlock q disk
1581  * Wnp = write new parity
1582  * Wnd = write new data
1583  * Wnq = write new "q"
1584  * [ ] denotes optional segments in the graph.
1585  *
1586  * Parameters:	raidPtr	  - description of the physical array
1587  *		asmap	  - logical & physical addresses for this access
1588  *		bp	  - buffer ptr (holds write data)
1589  *		flags	  - general flags (e.g. disk locking)
1590  *		allocList - list of memory allocated in DAG creation
1591  *		pfuncs	  - list of parity generating functions
1592  *		qfuncs	  - list of q generating functions
1593  *
1594  * A null qfuncs indicates single fault tolerant.
1595  *****************************************************************************/
1596 
1597 void
rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)1598 rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1599     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1600     RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
1601 {
1602 	RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1603 	RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1604 	RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1605 	RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1606 	int i, j, nNodes, totalNumNodes, lu_flag;
1607 	RF_ReconUnitNum_t which_ru;
1608 	int (*func) (RF_DagNode_t *);
1609 	int (*undoFunc) (RF_DagNode_t *);
1610 	int (*qfunc) (RF_DagNode_t *);
1611 	int numDataNodes, numParityNodes;
1612 	RF_StripeNum_t parityStripeID;
1613 	RF_PhysDiskAddr_t *pda;
1614 	char *name, *qname;
1615 	long nfaults;
1616 
1617 	nfaults = qfuncs ? 2 : 1;
1618 	lu_flag = (rf_enableAtomicRMW) ? 1 : 0;	/* Lock/unlock flag. */
1619 
1620 	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1621 	    asmap->raidAddress, &which_ru);
1622 	pda = asmap->physInfo;
1623 	numDataNodes = asmap->numStripeUnitsAccessed;
1624 	numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1625 
1626 	if (rf_dagDebug)
1627 		printf("[Creating small-write DAG]\n");
1628 	RF_ASSERT(numDataNodes > 0);
1629 	dag_h->creator = "SmallWriteDAGFwd";
1630 
1631 	dag_h->numCommitNodes = 0;
1632 	dag_h->numCommits = 0;
1633 	dag_h->numSuccedents = 1;
1634 
1635 	qfunc = NULL;
1636 	qname = NULL;
1637 
1638 	/*
1639 	 * DAG creation occurs in four steps:
1640 	 * 1. Count the number of nodes in the DAG.
1641 	 * 2. Create the nodes.
1642 	 * 3. Initialize the nodes.
1643 	 * 4. Connect the nodes.
1644 	 */
1645 
1646 	/* Step 1. Compute number of nodes in the graph. */
1647 
1648 	/*
1649 	 * Number of nodes: a read and write for each data unit, a redundancy
1650 	 * computation node for each parity node (nfaults * nparity), a read
1651 	 * and write for each parity unit, a block node, a terminate node if
1652 	 * atomic RMW, an unlock node for each data/redundancy unit.
1653 	 */
1654 	totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
1655 	    + (nfaults * 2 * numParityNodes) + 2;
1656 	if (lu_flag)
1657 		totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1658 
1659 
1660 	/* Step 2. Create the nodes. */
1661 	RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
1662 	    (RF_DagNode_t *), allocList);
1663 	i = 0;
1664 	blockNode = &nodes[i];
1665 	i += 1;
1666 	readDataNodes = &nodes[i];
1667 	i += numDataNodes;
1668 	readParityNodes = &nodes[i];
1669 	i += numParityNodes;
1670 	writeDataNodes = &nodes[i];
1671 	i += numDataNodes;
1672 	writeParityNodes = &nodes[i];
1673 	i += numParityNodes;
1674 	xorNodes = &nodes[i];
1675 	i += numParityNodes;
1676 	termNode = &nodes[i];
1677 	i += 1;
1678 	if (lu_flag) {
1679 		unlockDataNodes = &nodes[i];
1680 		i += numDataNodes;
1681 		unlockParityNodes = &nodes[i];
1682 		i += numParityNodes;
1683 	} else {
1684 		unlockDataNodes = unlockParityNodes = NULL;
1685 	}
1686 	if (nfaults == 2) {
1687 		readQNodes = &nodes[i];
1688 		i += numParityNodes;
1689 		writeQNodes = &nodes[i];
1690 		i += numParityNodes;
1691 		qNodes = &nodes[i];
1692 		i += numParityNodes;
1693 		if (lu_flag) {
1694 			unlockQNodes = &nodes[i];
1695 			i += numParityNodes;
1696 		} else {
1697 			unlockQNodes = NULL;
1698 		}
1699 	} else {
1700 		readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1701 	}
1702 	RF_ASSERT(i == totalNumNodes);
1703 
1704 	/* Step 3. Initialize the nodes. */
1705 	/* Initialize block node (Nil). */
1706 	nNodes = numDataNodes + (nfaults * numParityNodes);
1707 	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1708 	    rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
1709 	    "Nil", allocList);
1710 
1711 	/* Initialize terminate node (Trm). */
1712 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1713 	    rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
1714 	    "Trm", allocList);
1715 
1716 	/* Initialize nodes which read old data (Rod). */
1717 	for (i = 0; i < numDataNodes; i++) {
1718 		rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
1719 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1720 		    (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h,
1721 		    "Rod", allocList);
1722 		RF_ASSERT(pda != NULL);
1723 		/* Physical disk addr desc. */
1724 		readDataNodes[i].params[0].p = pda;
1725 		/* Buffer to hold old data. */
1726 		readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
1727 		    pda, allocList);
1728 		readDataNodes[i].params[2].v = parityStripeID;
1729 		readDataNodes[i].params[3].v =
1730 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1731 		    lu_flag, 0, which_ru);
1732 		pda = pda->next;
1733 		for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1734 			readDataNodes[i].propList[j] = NULL;
1735 	}
1736 
1737 	/* Initialize nodes which read old parity (Rop). */
1738 	pda = asmap->parityInfo;
1739 	i = 0;
1740 	for (i = 0; i < numParityNodes; i++) {
1741 		RF_ASSERT(pda != NULL);
1742 		rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
1743 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1744 		    numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1745 		readParityNodes[i].params[0].p = pda;
1746 		/* Buffer to hold old parity. */
1747 		readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1748 		    dag_h, pda, allocList);
1749 		readParityNodes[i].params[2].v = parityStripeID;
1750 		readParityNodes[i].params[3].v =
1751 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1752 		    lu_flag, 0, which_ru);
1753 		for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1754 			readParityNodes[i].propList[0] = NULL;
1755 		pda = pda->next;
1756 	}
1757 
1758 	/* Initialize nodes which read old Q (Roq). */
1759 	if (nfaults == 2) {
1760 		pda = asmap->qInfo;
1761 		for (i = 0; i < numParityNodes; i++) {
1762 			RF_ASSERT(pda != NULL);
1763 			rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
1764 			    rf_DiskReadFunc, rf_DiskReadUndoFunc,
1765 			    rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
1766 			    dag_h, "Roq", allocList);
1767 			readQNodes[i].params[0].p = pda;
1768 			/* Buffer to hold old Q. */
1769 			readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1770 			    dag_h, pda, allocList);
1771 			readQNodes[i].params[2].v = parityStripeID;
1772 			readQNodes[i].params[3].v =
1773 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1774 			    lu_flag, 0, which_ru);
1775 			for (j = 0; j < readQNodes[i].numSuccedents; j++)
1776 				readQNodes[i].propList[0] = NULL;
1777 			pda = pda->next;
1778 		}
1779 	}
1780 	/* Initialize nodes which write new data (Wnd). */
1781 	pda = asmap->physInfo;
1782 	for (i = 0; i < numDataNodes; i++) {
1783 		RF_ASSERT(pda != NULL);
1784 		rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
1785 		    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1786 		    rf_GenericWakeupFunc, 1, 1, 4, 0,
1787 		    dag_h, "Wnd", allocList);
1788 		/* Physical disk addr desc. */
1789 		writeDataNodes[i].params[0].p = pda;
1790 		/* Buffer holding new data to be written. */
1791 		writeDataNodes[i].params[1].p = pda->bufPtr;
1792 		writeDataNodes[i].params[2].v = parityStripeID;
1793 		writeDataNodes[i].params[3].v =
1794 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1795 
1796 		if (lu_flag) {
1797 			/* Initialize node to unlock the disk queue. */
1798 			rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
1799 			    rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1800 			    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1801 			    "Und", allocList);
1802 			/* Physical disk addr desc. */
1803 			unlockDataNodes[i].params[0].p = pda;
1804 			unlockDataNodes[i].params[1].v =
1805 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1806 			    0, lu_flag, which_ru);
1807 		}
1808 		pda = pda->next;
1809 	}
1810 
1811 
1812 	/* Initialize nodes which compute new parity and Q. */
1813 	/*
1814 	 * Use the simple XOR func in the double-XOR case, and when
1815 	 * accessing only a portion of one stripe unit. The distinction
1816 	 * between the two is that the regular XOR func assumes that the
1817 	 * targbuf is a full SU in size, and examines the pda associated with
1818 	 * the buffer to decide where within the buffer to XOR the data,
1819 	 * whereas the simple XOR func just XORs the data into the start of
1820 	 * the buffer.
1821 	 */
1822 	if ((numParityNodes == 2) || ((numDataNodes == 1) &&
1823 	    (asmap->totalSectorsAccessed <
1824 	     raidPtr->Layout.sectorsPerStripeUnit))) {
1825 		func = pfuncs->simple;
1826 		undoFunc = rf_NullNodeUndoFunc;
1827 		name = pfuncs->SimpleName;
1828 		if (qfuncs) {
1829 			qfunc = qfuncs->simple;
1830 			qname = qfuncs->SimpleName;
1831 		}
1832 	} else {
1833 		func = pfuncs->regular;
1834 		undoFunc = rf_NullNodeUndoFunc;
1835 		name = pfuncs->RegularName;
1836 		if (qfuncs) {
1837 			qfunc = qfuncs->regular;
1838 			qname = qfuncs->RegularName;
1839 		}
1840 	}
1841 	/*
1842 	 * Initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1843 	 * nodes, and raidPtr.
1844 	 */
1845 	if (numParityNodes == 2) {	/* Double-xor case. */
1846 		for (i = 0; i < numParityNodes; i++) {
1847 			/* No wakeup func for xor. */
1848 			rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
1849 			    undoFunc, NULL, numParityNodes, numParityNodes +
1850 			    numDataNodes, 7, 1, dag_h, name, allocList);
1851 			xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1852 			xorNodes[i].params[0] = readDataNodes[i].params[0];
1853 			xorNodes[i].params[1] = readDataNodes[i].params[1];
1854 			xorNodes[i].params[2] = readParityNodes[i].params[0];
1855 			xorNodes[i].params[3] = readParityNodes[i].params[1];
1856 			xorNodes[i].params[4] = writeDataNodes[i].params[0];
1857 			xorNodes[i].params[5] = writeDataNodes[i].params[1];
1858 			xorNodes[i].params[6].p = raidPtr;
1859 			/* Use old parity buf as target buf. */
1860 			xorNodes[i].results[0] = readParityNodes[i].params[1].p;
1861 			if (nfaults == 2) {
1862 				/* No wakeup func for xor. */
1863 				rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
1864 				    qfunc, undoFunc, NULL, numParityNodes,
1865 				    numParityNodes + numDataNodes, 7, 1,
1866 				    dag_h, qname, allocList);
1867 				qNodes[i].params[0] =
1868 				    readDataNodes[i].params[0];
1869 				qNodes[i].params[1] =
1870 				    readDataNodes[i].params[1];
1871 				qNodes[i].params[2] = readQNodes[i].params[0];
1872 				qNodes[i].params[3] = readQNodes[i].params[1];
1873 				qNodes[i].params[4] =
1874 				    writeDataNodes[i].params[0];
1875 				qNodes[i].params[5] =
1876 				    writeDataNodes[i].params[1];
1877 				qNodes[i].params[6].p = raidPtr;
1878 				/* Use old Q buf as target buf. */
1879 				qNodes[i].results[0] =
1880 				    readQNodes[i].params[1].p;
1881 			}
1882 		}
1883 	} else {
1884 		/* There is only one xor node in this case. */
1885 		rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
1886 		    NULL, numParityNodes, numParityNodes + numDataNodes,
1887 		    (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
1888 		    name, allocList);
1889 		xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1890 		for (i = 0; i < numDataNodes + 1; i++) {
1891 			/* Set up params related to Rod and Rop nodes. */
1892 			xorNodes[0].params[2 * i + 0] =
1893 			    readDataNodes[i].params[0];	/* pda */
1894 			xorNodes[0].params[2 * i + 1] =
1895 			    readDataNodes[i].params[1];	/* buffer pointer */
1896 		}
1897 		for (i = 0; i < numDataNodes; i++) {
1898 			/* Set up params related to Wnd and Wnp nodes. */
1899 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
1900 			    writeDataNodes[i].params[0]; /* pda */
1901 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
1902 			    writeDataNodes[i].params[1]; /* buffer pointer */
1903 		}
1904 		xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
1905 		    raidPtr;	/* xor node needs to get at RAID information */
1906 		xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1907 		if (nfaults == 2) {
1908 			rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
1909 			    undoFunc, NULL, numParityNodes,
1910 			    numParityNodes + numDataNodes,
1911 			    (2 * (numDataNodes + numDataNodes + 1) + 1),
1912 			    1, dag_h, qname, allocList);
1913 			for (i = 0; i < numDataNodes; i++) {
1914 				/* Set up params related to Rod. */
1915 				/* pda */
1916 				qNodes[0].params[2 * i + 0] =
1917 				    readDataNodes[i].params[0];
1918 				/* buffer pointer */
1919 				qNodes[0].params[2 * i + 1] =
1920 				    readDataNodes[i].params[1];
1921 			}
1922 			/* And read old q. */
1923 			qNodes[0].params[2 * numDataNodes + 0] =
1924 			    readQNodes[0].params[0];	/* pda */
1925 			qNodes[0].params[2 * numDataNodes + 1] =
1926 			    readQNodes[0].params[1];	/* buffer pointer */
1927 			for (i = 0; i < numDataNodes; i++) {
1928 				/* Set up params related to Wnd nodes. */
1929 				/* pda */
1930 				qNodes[0].params
1931 				    [2 * (numDataNodes + 1 + i) + 0] =
1932 				    writeDataNodes[i].params[0];
1933 				/* buffer pointer */
1934 				qNodes[0].params
1935 				    [2 * (numDataNodes + 1 + i) + 1] =
1936 				    writeDataNodes[i].params[1];
1937 			}
1938 			/* Xor node needs to get at RAID information. */
1939 			qNodes[0].params
1940 			    [2 * (numDataNodes + numDataNodes + 1)].p =
1941 			    raidPtr;
1942 			qNodes[0].results[0] = readQNodes[0].params[1].p;
1943 		}
1944 	}
1945 
1946 	/* Initialize nodes which write new parity (Wnp). */
1947 	pda = asmap->parityInfo;
1948 	for (i = 0; i < numParityNodes; i++) {
1949 		rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
1950 		    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1951 		    rf_GenericWakeupFunc, 1, numParityNodes,
1952 		    4, 0, dag_h, "Wnp", allocList);
1953 		RF_ASSERT(pda != NULL);
1954 		/* Param 1 (bufPtr) filled in by xor node. */
1955 		writeParityNodes[i].params[0].p = pda;
1956 		/* Buffer pointer for parity write operation. */
1957 		writeParityNodes[i].params[1].p = xorNodes[i].results[0];
1958 		writeParityNodes[i].params[2].v = parityStripeID;
1959 		writeParityNodes[i].params[3].v =
1960 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1961 
1962 		if (lu_flag) {
1963 			/* Initialize node to unlock the disk queue. */
1964 			rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
1965 			    rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1966 			    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1967 			    "Unp", allocList);
1968 			unlockParityNodes[i].params[0].p =
1969 			    pda;	/* Physical disk addr desc. */
1970 			unlockParityNodes[i].params[1].v =
1971 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1972 			    0, lu_flag, which_ru);
1973 		}
1974 		pda = pda->next;
1975 	}
1976 
1977 	/* Initialize nodes which write new Q (Wnq). */
1978 	if (nfaults == 2) {
1979 		pda = asmap->qInfo;
1980 		for (i = 0; i < numParityNodes; i++) {
1981 			rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
1982 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1983 			    rf_GenericWakeupFunc, 1, numParityNodes,
1984 			    4, 0, dag_h, "Wnq", allocList);
1985 			RF_ASSERT(pda != NULL);
1986 			/* Param 1 (bufPtr) filled in by xor node. */
1987 			writeQNodes[i].params[0].p = pda;
1988 			/* Buffer pointer for parity write operation. */
1989 			writeQNodes[i].params[1].p = qNodes[i].results[0];
1990 			writeQNodes[i].params[2].v = parityStripeID;
1991 			writeQNodes[i].params[3].v =
1992 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1993 			    0, 0, which_ru);
1994 
1995 			if (lu_flag) {
1996 				/* Initialize node to unlock the disk queue. */
1997 				rf_InitNode(&unlockQNodes[i], rf_wait,
1998 				    RF_FALSE, rf_DiskUnlockFunc,
1999 				    rf_DiskUnlockUndoFunc,
2000 				    rf_GenericWakeupFunc, 1, 1, 2, 0,
2001 				    dag_h, "Unq", allocList);
2002 				/* Physical disk addr desc. */
2003 				unlockQNodes[i].params[0].p = pda;
2004 				unlockQNodes[i].params[1].v =
2005 				    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2006 				    0, lu_flag, which_ru);
2007 			}
2008 			pda = pda->next;
2009 		}
2010 	}
2011 	/* Step 4. Connect the nodes. */
2012 
2013 	/* Connect header to block node. */
2014 	dag_h->succedents[0] = blockNode;
2015 
2016 	/* Connect block node to read old data nodes. */
2017 	RF_ASSERT(blockNode->numSuccedents ==
2018 	    (numDataNodes + (numParityNodes * nfaults)));
2019 	for (i = 0; i < numDataNodes; i++) {
2020 		blockNode->succedents[i] = &readDataNodes[i];
2021 		RF_ASSERT(readDataNodes[i].numAntecedents == 1);
2022 		readDataNodes[i].antecedents[0] = blockNode;
2023 		readDataNodes[i].antType[0] = rf_control;
2024 	}
2025 
2026 	/* Connect block node to read old parity nodes. */
2027 	for (i = 0; i < numParityNodes; i++) {
2028 		blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
2029 		RF_ASSERT(readParityNodes[i].numAntecedents == 1);
2030 		readParityNodes[i].antecedents[0] = blockNode;
2031 		readParityNodes[i].antType[0] = rf_control;
2032 	}
2033 
2034 	/* Connect block node to read old Q nodes. */
2035 	if (nfaults == 2)
2036 		for (i = 0; i < numParityNodes; i++) {
2037 			blockNode->succedents[numDataNodes +
2038 			    numParityNodes + i] = &readQNodes[i];
2039 			RF_ASSERT(readQNodes[i].numAntecedents == 1);
2040 			readQNodes[i].antecedents[0] = blockNode;
2041 			readQNodes[i].antType[0] = rf_control;
2042 		}
2043 
2044 	/* Connect read old data nodes to write new data nodes. */
2045 	for (i = 0; i < numDataNodes; i++) {
2046 		RF_ASSERT(readDataNodes[i].numSuccedents ==
2047 		    ((nfaults * numParityNodes) + 1));
2048 		RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
2049 		readDataNodes[i].succedents[0] = &writeDataNodes[i];
2050 		writeDataNodes[i].antecedents[0] = &readDataNodes[i];
2051 		writeDataNodes[i].antType[0] = rf_antiData;
2052 	}
2053 
2054 	/* Connect read old data nodes to xor nodes. */
2055 	for (i = 0; i < numDataNodes; i++) {
2056 		for (j = 0; j < numParityNodes; j++) {
2057 			RF_ASSERT(xorNodes[j].numAntecedents ==
2058 			    numDataNodes + numParityNodes);
2059 			readDataNodes[i].succedents[1 + j] = &xorNodes[j];
2060 			xorNodes[j].antecedents[i] = &readDataNodes[i];
2061 			xorNodes[j].antType[i] = rf_trueData;
2062 		}
2063 	}
2064 
2065 	/* Connect read old data nodes to q nodes. */
2066 	if (nfaults == 2)
2067 		for (i = 0; i < numDataNodes; i++)
2068 			for (j = 0; j < numParityNodes; j++) {
2069 				RF_ASSERT(qNodes[j].numAntecedents ==
2070 				    numDataNodes + numParityNodes);
2071 				readDataNodes[i].succedents
2072 				    [1 + numParityNodes + j] = &qNodes[j];
2073 				qNodes[j].antecedents[i] = &readDataNodes[i];
2074 				qNodes[j].antType[i] = rf_trueData;
2075 			}
2076 
2077 	/* Connect read old parity nodes to xor nodes. */
2078 	for (i = 0; i < numParityNodes; i++) {
2079 		for (j = 0; j < numParityNodes; j++) {
2080 			RF_ASSERT(readParityNodes[i].numSuccedents ==
2081 			    numParityNodes);
2082 			readParityNodes[i].succedents[j] = &xorNodes[j];
2083 			xorNodes[j].antecedents[numDataNodes + i] =
2084 			    &readParityNodes[i];
2085 			xorNodes[j].antType[numDataNodes + i] = rf_trueData;
2086 		}
2087 	}
2088 
2089 	/* Connect read old q nodes to q nodes. */
2090 	if (nfaults == 2)
2091 		for (i = 0; i < numParityNodes; i++) {
2092 			for (j = 0; j < numParityNodes; j++) {
2093 				RF_ASSERT(readQNodes[i].numSuccedents ==
2094 				    numParityNodes);
2095 				readQNodes[i].succedents[j] = &qNodes[j];
2096 				qNodes[j].antecedents[numDataNodes + i] =
2097 				    &readQNodes[i];
2098 				qNodes[j].antType[numDataNodes + i] =
2099 				    rf_trueData;
2100 			}
2101 		}
2102 
2103 	/* Connect xor nodes to the write new parity nodes. */
2104 	for (i = 0; i < numParityNodes; i++) {
2105 		RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
2106 		for (j = 0; j < numParityNodes; j++) {
2107 			RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
2108 			xorNodes[i].succedents[j] = &writeParityNodes[j];
2109 			writeParityNodes[j].antecedents[i] = &xorNodes[i];
2110 			writeParityNodes[j].antType[i] = rf_trueData;
2111 		}
2112 	}
2113 
2114 	/* Connect q nodes to the write new q nodes. */
2115 	if (nfaults == 2)
2116 		for (i = 0; i < numParityNodes; i++) {
2117 			RF_ASSERT(writeQNodes[i].numAntecedents ==
2118 			    numParityNodes);
2119 			for (j = 0; j < numParityNodes; j++) {
2120 				RF_ASSERT(qNodes[j].numSuccedents == 1);
2121 				qNodes[i].succedents[j] = &writeQNodes[j];
2122 				writeQNodes[j].antecedents[i] = &qNodes[i];
2123 				writeQNodes[j].antType[i] = rf_trueData;
2124 			}
2125 		}
2126 
2127 	RF_ASSERT(termNode->numAntecedents ==
2128 	    (numDataNodes + (nfaults * numParityNodes)));
2129 	RF_ASSERT(termNode->numSuccedents == 0);
2130 	for (i = 0; i < numDataNodes; i++) {
2131 		if (lu_flag) {
2132 			/* Connect write new data nodes to unlock nodes. */
2133 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2134 			RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
2135 			writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
2136 			unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
2137 			unlockDataNodes[i].antType[0] = rf_control;
2138 
2139 			/* Connect unlock nodes to term nodes. */
2140 			RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
2141 			unlockDataNodes[i].succedents[0] = termNode;
2142 			termNode->antecedents[i] = &unlockDataNodes[i];
2143 			termNode->antType[i] = rf_control;
2144 		} else {
2145 			/* Connect write new data nodes to term node. */
2146 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2147 			RF_ASSERT(termNode->numAntecedents ==
2148 			    (numDataNodes + (nfaults * numParityNodes)));
2149 			writeDataNodes[i].succedents[0] = termNode;
2150 			termNode->antecedents[i] = &writeDataNodes[i];
2151 			termNode->antType[i] = rf_control;
2152 		}
2153 	}
2154 
2155 	for (i = 0; i < numParityNodes; i++) {
2156 		if (lu_flag) {
2157 			/* Connect write new parity nodes to unlock nodes. */
2158 			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2159 			RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2160 			writeParityNodes[i].succedents[0] =
2161 			    &unlockParityNodes[i];
2162 			unlockParityNodes[i].antecedents[0] =
2163 			    &writeParityNodes[i];
2164 			unlockParityNodes[i].antType[0] = rf_control;
2165 
2166 			/* Connect unlock nodes to term node. */
2167 			RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2168 			unlockParityNodes[i].succedents[0] = termNode;
2169 			termNode->antecedents[numDataNodes + i] =
2170 			    &unlockParityNodes[i];
2171 			termNode->antType[numDataNodes + i] = rf_control;
2172 		} else {
2173 			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2174 			writeParityNodes[i].succedents[0] = termNode;
2175 			termNode->antecedents[numDataNodes + i] =
2176 			    &writeParityNodes[i];
2177 			termNode->antType[numDataNodes + i] = rf_control;
2178 		}
2179 	}
2180 
2181 	if (nfaults == 2)
2182 		for (i = 0; i < numParityNodes; i++) {
2183 			if (lu_flag) {
2184 				/* Connect write new Q nodes to unlock nodes. */
2185 				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2186 				RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2187 				writeQNodes[i].succedents[0] = &unlockQNodes[i];
2188 				unlockQNodes[i].antecedents[0] =
2189 				    &writeQNodes[i];
2190 				unlockQNodes[i].antType[0] = rf_control;
2191 
2192 				/* Connect unlock nodes to unblock node. */
2193 				RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2194 				unlockQNodes[i].succedents[0] = termNode;
2195 				termNode->antecedents[numDataNodes +
2196 				    numParityNodes + i] = &unlockQNodes[i];
2197 				termNode->antType[numDataNodes +
2198 				    numParityNodes + i] = rf_control;
2199 			} else {
2200 				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2201 				writeQNodes[i].succedents[0] = termNode;
2202 				termNode->antecedents[numDataNodes +
2203 				    numParityNodes + i] = &writeQNodes[i];
2204 				termNode->antType[numDataNodes +
2205 				    numParityNodes + i] = rf_control;
2206 			}
2207 		}
2208 }
2209 
2210 
2211 
2212 /*****************************************************************************
2213  * Create a write graph (fault-free or degraded) for RAID level 1.
2214  *
2215  * Hdr  Nil -> Wpd -> Nil -> Trm
2216  *	Nil -> Wsd ->
2217  *
2218  * The "Wpd" node writes data to the primary copy in the mirror pair.
2219  * The "Wsd" node writes data to the secondary copy in the mirror pair.
2220  *
2221  * Parameters:	raidPtr	  - description of the physical array
2222  *		asmap	  - logical & physical addresses for this access
2223  *		bp	  - buffer ptr (holds write data)
2224  *		flags	  - general flags (e.g. disk locking)
2225  *		allocList - list of memory allocated in DAG creation
2226  *****************************************************************************/
2227 
2228 void
rf_CreateRaidOneWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)2229 rf_CreateRaidOneWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
2230     RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
2231     RF_AllocListElem_t *allocList)
2232 {
2233 	RF_DagNode_t *blockNode, *unblockNode, *termNode;
2234 	RF_DagNode_t *nodes, *wndNode, *wmirNode;
2235 	int nWndNodes, nWmirNodes, i;
2236 	RF_ReconUnitNum_t which_ru;
2237 	RF_PhysDiskAddr_t *pda, *pdaP;
2238 	RF_StripeNum_t parityStripeID;
2239 
2240 	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2241 	    asmap->raidAddress, &which_ru);
2242 	if (rf_dagDebug) {
2243 		printf("[Creating RAID level 1 write DAG]\n");
2244 	}
2245 	/* 2 implies access not SU aligned. */
2246 	nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
2247 	nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2248 
2249 	/* Alloc the Wnd nodes and the Wmir node. */
2250 	if (asmap->numDataFailed == 1)
2251 		nWndNodes--;
2252 	if (asmap->numParityFailed == 1)
2253 		nWmirNodes--;
2254 
2255 	/*
2256 	 * Total number of nodes = nWndNodes + nWmirNodes +
2257 	 *			   (block + unblock + terminator)
2258 	 */
2259 	RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3,
2260 	    sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2261 	i = 0;
2262 	wndNode = &nodes[i];
2263 	i += nWndNodes;
2264 	wmirNode = &nodes[i];
2265 	i += nWmirNodes;
2266 	blockNode = &nodes[i];
2267 	i += 1;
2268 	unblockNode = &nodes[i];
2269 	i += 1;
2270 	termNode = &nodes[i];
2271 	i += 1;
2272 	RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2273 
2274 	/* This dag can commit immediately. */
2275 	dag_h->numCommitNodes = 0;
2276 	dag_h->numCommits = 0;
2277 	dag_h->numSuccedents = 1;
2278 
2279 	/* Initialize the unblock and term nodes. */
2280 	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2281 	    rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
2282 	    0, 0, 0, dag_h, "Nil", allocList);
2283 	rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2284 	    rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
2285 	    0, 0, dag_h, "Nil", allocList);
2286 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
2287 	    rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2288 
2289 	/* Initialize the wnd nodes. */
2290 	if (nWndNodes > 0) {
2291 		pda = asmap->physInfo;
2292 		for (i = 0; i < nWndNodes; i++) {
2293 			rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
2294 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2295 			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2296 			    "Wpd", allocList);
2297 			RF_ASSERT(pda != NULL);
2298 			wndNode[i].params[0].p = pda;
2299 			wndNode[i].params[1].p = pda->bufPtr;
2300 			wndNode[i].params[2].v = parityStripeID;
2301 			wndNode[i].params[3].v =
2302 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2303 			    0, 0, which_ru);
2304 			pda = pda->next;
2305 		}
2306 		RF_ASSERT(pda == NULL);
2307 	}
2308 	/* Initialize the mirror nodes. */
2309 	if (nWmirNodes > 0) {
2310 		pda = asmap->physInfo;
2311 		pdaP = asmap->parityInfo;
2312 		for (i = 0; i < nWmirNodes; i++) {
2313 			rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
2314 			    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2315 			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2316 			    "Wsd", allocList);
2317 			RF_ASSERT(pda != NULL);
2318 			wmirNode[i].params[0].p = pdaP;
2319 			wmirNode[i].params[1].p = pda->bufPtr;
2320 			wmirNode[i].params[2].v = parityStripeID;
2321 			wmirNode[i].params[3].v =
2322 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2323 			    0, 0, which_ru);
2324 			pda = pda->next;
2325 			pdaP = pdaP->next;
2326 		}
2327 		RF_ASSERT(pda == NULL);
2328 		RF_ASSERT(pdaP == NULL);
2329 	}
2330 	/* Link the header node to the block node. */
2331 	RF_ASSERT(dag_h->numSuccedents == 1);
2332 	RF_ASSERT(blockNode->numAntecedents == 0);
2333 	dag_h->succedents[0] = blockNode;
2334 
2335 	/* Link the block node to the write nodes. */
2336 	RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2337 	for (i = 0; i < nWndNodes; i++) {
2338 		RF_ASSERT(wndNode[i].numAntecedents == 1);
2339 		blockNode->succedents[i] = &wndNode[i];
2340 		wndNode[i].antecedents[0] = blockNode;
2341 		wndNode[i].antType[0] = rf_control;
2342 	}
2343 	for (i = 0; i < nWmirNodes; i++) {
2344 		RF_ASSERT(wmirNode[i].numAntecedents == 1);
2345 		blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2346 		wmirNode[i].antecedents[0] = blockNode;
2347 		wmirNode[i].antType[0] = rf_control;
2348 	}
2349 
2350 	/* Link the write nodes to the unblock node. */
2351 	RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2352 	for (i = 0; i < nWndNodes; i++) {
2353 		RF_ASSERT(wndNode[i].numSuccedents == 1);
2354 		wndNode[i].succedents[0] = unblockNode;
2355 		unblockNode->antecedents[i] = &wndNode[i];
2356 		unblockNode->antType[i] = rf_control;
2357 	}
2358 	for (i = 0; i < nWmirNodes; i++) {
2359 		RF_ASSERT(wmirNode[i].numSuccedents == 1);
2360 		wmirNode[i].succedents[0] = unblockNode;
2361 		unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2362 		unblockNode->antType[i + nWndNodes] = rf_control;
2363 	}
2364 
2365 	/* Link the unblock node to the term node. */
2366 	RF_ASSERT(unblockNode->numSuccedents == 1);
2367 	RF_ASSERT(termNode->numAntecedents == 1);
2368 	RF_ASSERT(termNode->numSuccedents == 0);
2369 	unblockNode->succedents[0] = termNode;
2370 	termNode->antecedents[0] = unblockNode;
2371 	termNode->antType[0] = rf_control;
2372 
2373 	return;
2374 }
2375