1 /*	$OpenBSD: rf_parityloggingdags.c,v 1.4 2002/12/16 07:01:04 tdeval Exp $	*/
2 /*	$NetBSD: rf_parityloggingdags.c,v 1.4 2000/01/07 03:41:04 oster Exp $	*/
3 
4 /*
5  * Copyright (c) 1995 Carnegie-Mellon University.
6  * All rights reserved.
7  *
8  * Author: William V. Courtright II
9  *
10  * Permission to use, copy, modify and distribute this software and
11  * its documentation is hereby granted, provided that both the copyright
12  * notice and this permission notice appear in all copies of the
13  * software, derivative works or modified versions, and any portions
14  * thereof, and that both notices appear in supporting documentation.
15  *
16  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19  *
20  * Carnegie Mellon requests users of this software to return to
21  *
22  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
23  *  School of Computer Science
24  *  Carnegie Mellon University
25  *  Pittsburgh PA 15213-3890
26  *
27  * any improvements or extensions that they make and grant Carnegie the
28  * rights to redistribute these changes.
29  */
30 
31 #include "rf_archs.h"
32 
33 #if	RF_INCLUDE_PARITYLOGGING > 0
34 
35 /*
36  * DAGs specific to parity logging are created here.
37  */
38 
39 #include "rf_types.h"
40 #include "rf_raid.h"
41 #include "rf_dag.h"
42 #include "rf_dagutils.h"
43 #include "rf_dagfuncs.h"
44 #include "rf_debugMem.h"
45 #include "rf_paritylog.h"
46 #include "rf_memchunk.h"
47 #include "rf_general.h"
48 
49 #include "rf_parityloggingdags.h"
50 
51 /*****************************************************************************
52  *
53  * Creates a DAG to perform a large-write operation:
54  *
55  *         / Rod \     / Wnd \
56  * H -- NIL- Rod - NIL - Wnd ------ NIL - T
57  *         \ Rod /     \ Xor - Lpo /
58  *
59  * The writes are not done until the reads complete because if they were done
60  * in parallel, a failure on one of the reads could leave the parity in an
61  * inconsistent state, so that the retry with a new DAG would produce
62  * erroneous parity.
63  *
64  * Note:  This DAG has the nasty property that none of the buffers allocated
65  *        for reading old data can be freed until the XOR node fires.
66  *        Need to fix this.
67  *
68  * The last two arguments are the number of faults tolerated, and function
69  * for the redundancy calculation. The undo for the redundancy calc is assumed
70  * to be null.
71  *
72  *****************************************************************************/
73 
74 void
rf_CommonCreateParityLoggingLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *))75 rf_CommonCreateParityLoggingLargeWriteDAG(RF_Raid_t * raidPtr,
76     RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp,
77     RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList, int nfaults,
78     int (*redFunc) (RF_DagNode_t *))
79 {
80 	RF_DagNode_t *nodes, *wndNodes, *rodNodes = NULL, *syncNode, *xorNode;
81 	RF_DagNode_t *lpoNode, *blockNode, *unblockNode, *termNode;
82 	int nWndNodes, nRodNodes, i;
83 	RF_RaidLayout_t *layoutPtr = &(raidPtr->Layout);
84 	RF_AccessStripeMapHeader_t *new_asm_h[2];
85 	int nodeNum, asmNum;
86 	RF_ReconUnitNum_t which_ru;
87 	char *sosBuffer, *eosBuffer;
88 	RF_PhysDiskAddr_t *pda;
89 	RF_StripeNum_t parityStripeID =
90 	    rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
91 	     asmap->raidAddress, &which_ru);
92 
93 	if (rf_dagDebug)
94 		printf("[Creating parity-logging large-write DAG]\n");
95 	RF_ASSERT(nfaults == 1); /* This arch only single fault tolerant. */
96 	dag_h->creator = "ParityLoggingLargeWriteDAG";
97 
98 	/* Alloc the Wnd nodes, the xor node, and the Lpo node. */
99 	nWndNodes = asmap->numStripeUnitsAccessed;
100 	RF_CallocAndAdd(nodes, nWndNodes + 6, sizeof(RF_DagNode_t),
101 	    (RF_DagNode_t *), allocList);
102 	i = 0;
103 	wndNodes = &nodes[i];
104 	i += nWndNodes;
105 	xorNode = &nodes[i];
106 	i += 1;
107 	lpoNode = &nodes[i];
108 	i += 1;
109 	blockNode = &nodes[i];
110 	i += 1;
111 	syncNode = &nodes[i];
112 	i += 1;
113 	unblockNode = &nodes[i];
114 	i += 1;
115 	termNode = &nodes[i];
116 	i += 1;
117 
118 	dag_h->numCommitNodes = nWndNodes + 1;
119 	dag_h->numCommits = 0;
120 	dag_h->numSuccedents = 1;
121 
122 	rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
123 	    new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
124 	if (nRodNodes > 0)
125 		RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
126 		    (RF_DagNode_t *), allocList);
127 
128 	/* Begin node initialization. */
129 	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
130 	    rf_NullNodeUndoFunc, NULL, nRodNodes + 1, 0, 0, 0, dag_h,
131 	    "Nil", allocList);
132 	rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
133 	    rf_NullNodeUndoFunc, NULL, 1, nWndNodes + 1, 0, 0, dag_h,
134 	    "Nil", allocList);
135 	rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
136 	    rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes + 1,
137 	    0, 0, dag_h, "Nil", allocList);
138 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
139 	    rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
140 
141 	/* Initialize the Rod nodes. */
142 	for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
143 		if (new_asm_h[asmNum]) {
144 			pda = new_asm_h[asmNum]->stripeMap->physInfo;
145 			while (pda) {
146 				rf_InitNode(&rodNodes[nodeNum], rf_wait,
147 				    RF_FALSE, rf_DiskReadFunc,
148 				    rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
149 				    1, 1, 4, 0, dag_h, "Rod", allocList);
150 				rodNodes[nodeNum].params[0].p = pda;
151 				rodNodes[nodeNum].params[1].p = pda->bufPtr;
152 				rodNodes[nodeNum].params[2].v = parityStripeID;
153 				rodNodes[nodeNum].params[3].v =
154 				    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
155 				     0, 0, which_ru);
156 				nodeNum++;
157 				pda = pda->next;
158 			}
159 		}
160 	}
161 	RF_ASSERT(nodeNum == nRodNodes);
162 
163 	/* Initialize the wnd nodes. */
164 	pda = asmap->physInfo;
165 	for (i = 0; i < nWndNodes; i++) {
166 		rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc,
167 		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
168 		    dag_h, "Wnd", allocList);
169 		RF_ASSERT(pda != NULL);
170 		wndNodes[i].params[0].p = pda;
171 		wndNodes[i].params[1].p = pda->bufPtr;
172 		wndNodes[i].params[2].v = parityStripeID;
173 		wndNodes[i].params[3].v =
174 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
175 		pda = pda->next;
176 	}
177 
178 	/* Initialize the redundancy node. */
179 	rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc,
180 	    NULL, 1, 1, 2 * (nWndNodes + nRodNodes) + 1, 1, dag_h,
181 	    "Xr ", allocList);
182 	xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
183 	for (i = 0; i < nWndNodes; i++) {
184 		/* pda */
185 		xorNode->params[2 * i + 0] = wndNodes[i].params[0];
186 		/* buf ptr */
187 		xorNode->params[2 * i + 1] = wndNodes[i].params[1];
188 	}
189 	for (i = 0; i < nRodNodes; i++) {
190 		xorNode->params[2 * (nWndNodes + i) + 0] =
191 		    rodNodes[i].params[0];	/* pda */
192 		xorNode->params[2 * (nWndNodes + i) + 1] =
193 		    rodNodes[i].params[1];	/* buf ptr */
194 	}
195 	/* Xor node needs to get at RAID information. */
196 	xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
197 
198 	/*
199 	 * Look for an Rod node that reads a complete SU. If none, alloc a
200 	 * buffer to receive the parity info. Note that we can't use a new
201 	 * data buffer because it will not have gotten written when the xor
202 	 * occurs.
203 	 */
204 	for (i = 0; i < nRodNodes; i++)
205 		if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
206 		    ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
207 			break;
208 	if (i == nRodNodes) {
209 		RF_CallocAndAdd(xorNode->results[0], 1,
210 		    rf_RaidAddressToByte(raidPtr,
211 		     raidPtr->Layout.sectorsPerStripeUnit), (void *),
212 		    allocList);
213 	} else {
214 		xorNode->results[0] = rodNodes[i].params[1].p;
215 	}
216 
217 	/* Initialize the Lpo node. */
218 	rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc,
219 	    rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0,
220 	    dag_h, "Lpo", allocList);
221 
222 	lpoNode->params[0].p = asmap->parityInfo;
223 	lpoNode->params[1].p = xorNode->results[0];
224 	/* parityInfo must describe entire parity unit. */
225 	RF_ASSERT(asmap->parityInfo->next == NULL);
226 
227 	/* Connect nodes to form graph. */
228 
229 	/* Connect dag header to block node. */
230 	RF_ASSERT(dag_h->numSuccedents == 1);
231 	RF_ASSERT(blockNode->numAntecedents == 0);
232 	dag_h->succedents[0] = blockNode;
233 
234 	/* Connect the block node to the Rod nodes. */
235 	RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1);
236 	for (i = 0; i < nRodNodes; i++) {
237 		RF_ASSERT(rodNodes[i].numAntecedents == 1);
238 		blockNode->succedents[i] = &rodNodes[i];
239 		rodNodes[i].antecedents[0] = blockNode;
240 		rodNodes[i].antType[0] = rf_control;
241 	}
242 
243 	/* Connect the block node to the sync node. */
244 	/* necessary if nRodNodes == 0 */
245 	RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1);
246 	blockNode->succedents[nRodNodes] = syncNode;
247 	syncNode->antecedents[0] = blockNode;
248 	syncNode->antType[0] = rf_control;
249 
250 	/* Connect the Rod nodes to the syncNode. */
251 	for (i = 0; i < nRodNodes; i++) {
252 		rodNodes[i].succedents[0] = syncNode;
253 		syncNode->antecedents[1 + i] = &rodNodes[i];
254 		syncNode->antType[1 + i] = rf_control;
255 	}
256 
257 	/* Connect the sync node to the xor node. */
258 	RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1);
259 	RF_ASSERT(xorNode->numAntecedents == 1);
260 	syncNode->succedents[0] = xorNode;
261 	xorNode->antecedents[0] = syncNode;
262 	xorNode->antType[0] = rf_trueData;	/* Carry forward from sync. */
263 
264 	/* Connect the sync node to the Wnd nodes. */
265 	for (i = 0; i < nWndNodes; i++) {
266 		RF_ASSERT(wndNodes->numAntecedents == 1);
267 		syncNode->succedents[1 + i] = &wndNodes[i];
268 		wndNodes[i].antecedents[0] = syncNode;
269 		wndNodes[i].antType[0] = rf_control;
270 	}
271 
272 	/* Connect the xor node to the Lpo node. */
273 	RF_ASSERT(xorNode->numSuccedents == 1);
274 	RF_ASSERT(lpoNode->numAntecedents == 1);
275 	xorNode->succedents[0] = lpoNode;
276 	lpoNode->antecedents[0] = xorNode;
277 	lpoNode->antType[0] = rf_trueData;
278 
279 	/* Connect the Wnd nodes to the unblock node. */
280 	RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1);
281 	for (i = 0; i < nWndNodes; i++) {
282 		RF_ASSERT(wndNodes->numSuccedents == 1);
283 		wndNodes[i].succedents[0] = unblockNode;
284 		unblockNode->antecedents[i] = &wndNodes[i];
285 		unblockNode->antType[i] = rf_control;
286 	}
287 
288 	/* Connect the Lpo node to the unblock node. */
289 	RF_ASSERT(lpoNode->numSuccedents == 1);
290 	lpoNode->succedents[0] = unblockNode;
291 	unblockNode->antecedents[nWndNodes] = lpoNode;
292 	unblockNode->antType[nWndNodes] = rf_control;
293 
294 	/* Connect unblock node to terminator. */
295 	RF_ASSERT(unblockNode->numSuccedents == 1);
296 	RF_ASSERT(termNode->numAntecedents == 1);
297 	RF_ASSERT(termNode->numSuccedents == 0);
298 	unblockNode->succedents[0] = termNode;
299 	termNode->antecedents[0] = unblockNode;
300 	termNode->antType[0] = rf_control;
301 }
302 
303 
304 /*****************************************************************************
305  *
306  * Creates a DAG to perform a small-write operation (either raid 5 or pq),
307  * which is as follows:
308  *
309  *				       Header
310  *				          |
311  *				        Block
312  *				    / |  ... \   \
313  *				   /  |       \   \
314  *				Rod  Rod      Rod  Rop
315  *				 | \ /| \    / |  \/ |
316  *				 |    |        |  /\ |
317  *				Wnd  Wnd      Wnd   X
318  *				 |    \       /     |
319  *				 |     \     /      |
320  *				  \     \   /      Lpo
321  *				   \     \ /       /
322  *				    +-> Unblock <-+
323  *				          |
324  *				          T
325  *
326  *
327  * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity.
328  * When the access spans a stripe unit boundary and is less than one SU in
329  * size, there will be two Rop -- X -- Wnp branches. I call this the
330  * "double-XOR" case.
331  * The second output from each Rod node goes to the X node. In the double-XOR
332  * case, there are exactly 2 Rod nodes, and each sends one output to one X
333  * node.
334  * There is one Rod -- Wnd -- T branch for each stripe unit being updated.
335  *
336  * The block and unblock nodes are unused. See comment above
337  * CreateFaultFreeReadDAG.
338  *
339  * Note:  This DAG ignores all the optimizations related to making the RMWs
340  *        atomic.
341  *        It also has the nasty property that none of the buffers allocated
342  *        for reading old data & parity can be freed until the XOR node fires.
343  *        Need to fix this.
344  *
345  * A null qfuncs indicates single fault tolerant.
346  *****************************************************************************/
347 
348 void
rf_CommonCreateParityLoggingSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)349 rf_CommonCreateParityLoggingSmallWriteDAG(RF_Raid_t *raidPtr,
350     RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp,
351     RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList,
352     RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
353 {
354 	RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes;
355 	RF_DagNode_t *readDataNodes, *readParityNodes;
356 	RF_DagNode_t *writeDataNodes, *lpuNodes;
357 	RF_DagNode_t *unlockDataNodes = NULL, *termNode;
358 	RF_PhysDiskAddr_t *pda = asmap->physInfo;
359 	int numDataNodes = asmap->numStripeUnitsAccessed;
360 	int numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
361 	int i, j, nNodes, totalNumNodes;
362 	RF_ReconUnitNum_t which_ru;
363 	int (*func) (RF_DagNode_t * node), (*undoFunc) (RF_DagNode_t * node);
364 	int (*qfunc) (RF_DagNode_t * node);
365 	char*name, *qname;
366 	RF_StripeNum_t parityStripeID =
367 	    rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
368 	     asmap->raidAddress, &which_ru);
369 	long nfaults = qfuncs ? 2 : 1;
370 	int lu_flag = (rf_enableAtomicRMW) ? 1 : 0;	/* Lock/unlock flag. */
371 
372 	if (rf_dagDebug)
373 		printf("[Creating parity-logging small-write DAG]\n");
374 	RF_ASSERT(numDataNodes > 0);
375 	RF_ASSERT(nfaults == 1);
376 	dag_h->creator = "ParityLoggingSmallWriteDAG";
377 
378 	/*
379 	 * DAG creation occurs in three steps:
380 	 * 1. Count the number of nodes in the DAG.
381 	 * 2. Create the nodes.
382 	 * 3. Initialize the nodes.
383 	 * 4. Connect the nodes.
384 	 */
385 
386 	/* Step 1. Compute number of nodes in the graph. */
387 
388 	/*
389 	 * Number of nodes: a read and write for each data unit, a redundancy
390 	 * computation node for each parity node, a read and Lpu for each
391 	 * parity unit, a block and unblock node (2), a terminator node if
392 	 * atomic RMW, an unlock node for each data and redundancy unit.
393 	 */
394 	totalNumNodes = (2 * numDataNodes) + numParityNodes +
395 	    (2 * numParityNodes) + 3;
396 	if (lu_flag)
397 		totalNumNodes += numDataNodes;
398 
399 	nNodes = numDataNodes + numParityNodes;
400 
401 	dag_h->numCommitNodes = numDataNodes + numParityNodes;
402 	dag_h->numCommits = 0;
403 	dag_h->numSuccedents = 1;
404 
405 	/* Step 2. Create the nodes. */
406 	RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
407 	    (RF_DagNode_t *), allocList);
408 	i = 0;
409 	blockNode = &nodes[i];
410 	i += 1;
411 	unblockNode = &nodes[i];
412 	i += 1;
413 	readDataNodes = &nodes[i];
414 	i += numDataNodes;
415 	readParityNodes = &nodes[i];
416 	i += numParityNodes;
417 	writeDataNodes = &nodes[i];
418 	i += numDataNodes;
419 	lpuNodes = &nodes[i];
420 	i += numParityNodes;
421 	xorNodes = &nodes[i];
422 	i += numParityNodes;
423 	termNode = &nodes[i];
424 	i += 1;
425 	if (lu_flag) {
426 		unlockDataNodes = &nodes[i];
427 		i += numDataNodes;
428 	}
429 	RF_ASSERT(i == totalNumNodes);
430 
431 	/* Step 3. Initialize the nodes. */
432 	/* Initialize block node (Nil). */
433 	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
434 	    rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
435 	    "Nil", allocList);
436 
437 	/* Initialize unblock node (Nil). */
438 	rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
439 	    rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h,
440 	    "Nil", allocList);
441 
442 	/* Initialize terminatory node (Trm). */
443 	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
444 	    rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
445 
446 	/* Initialize nodes which read old data (Rod). */
447 	for (i = 0; i < numDataNodes; i++) {
448 		rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
449 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
450 		    nNodes, 1, 4, 0, dag_h, "Rod", allocList);
451 		RF_ASSERT(pda != NULL);
452 		/* Physical disk addr desc. */
453 		readDataNodes[i].params[0].p = pda;
454 		readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
455 		    pda, allocList);	/* Buffer to hold old data. */
456 		readDataNodes[i].params[2].v = parityStripeID;
457 		readDataNodes[i].params[3].v =
458 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag,
459 		    0, which_ru);
460 		pda = pda->next;
461 		readDataNodes[i].propList[0] = NULL;
462 		readDataNodes[i].propList[1] = NULL;
463 	}
464 
465 	/* Initialize nodes which read old parity (Rop). */
466 	pda = asmap->parityInfo;
467 	i = 0;
468 	for (i = 0; i < numParityNodes; i++) {
469 		RF_ASSERT(pda != NULL);
470 		rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
471 		    rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
472 		    nNodes, 1, 4, 0, dag_h, "Rop", allocList);
473 		readParityNodes[i].params[0].p = pda;
474 		readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
475 		    pda, allocList);	/* Buffer to hold old parity. */
476 		readParityNodes[i].params[2].v = parityStripeID;
477 		readParityNodes[i].params[3].v =
478 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
479 		readParityNodes[i].propList[0] = NULL;
480 		pda = pda->next;
481 	}
482 
483 	/* Initialize nodes which write new data (Wnd). */
484 	pda = asmap->physInfo;
485 	for (i = 0; i < numDataNodes; i++) {
486 		RF_ASSERT(pda != NULL);
487 		rf_InitNode(&writeDataNodes[i], rf_wait, RF_TRUE,
488 		    rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
489 		    rf_GenericWakeupFunc, 1, nNodes, 4, 0, dag_h,
490 		    "Wnd", allocList);
491 		/* Physical disk addr desc. */
492 		writeDataNodes[i].params[0].p = pda;
493 		/* Buffer holding new data to be written. */
494 		writeDataNodes[i].params[1].p = pda->bufPtr;
495 		writeDataNodes[i].params[2].v = parityStripeID;
496 		writeDataNodes[i].params[3].v =
497 		    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
498 
499 		if (lu_flag) {
500 			/* Initialize node to unlock the disk queue. */
501 			rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
502 			    rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
503 			    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
504 			    "Und", allocList);
505 			/* Physical disk addr desc. */
506 			unlockDataNodes[i].params[0].p = pda;
507 			unlockDataNodes[i].params[1].v =
508 			    RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0,
509 			    lu_flag, which_ru);
510 		}
511 		pda = pda->next;
512 	}
513 
514 
515 	/* Initialize nodes which compute new parity. */
516 	/*
517 	 * We use the simple XOR func in the double-XOR case, and when we're
518 	 * accessing only a portion of one stripe unit. The distinction
519 	 * between the two is that the regular XOR func assumes that the
520 	 * targbuf is a full SU in size, and examines the pda associated with
521 	 * the buffer to decide where within the buffer to XOR the data,
522 	 * whereas the simple XOR func just XORs the data into the start of
523 	 * the buffer.
524 	 */
525 	if ((numParityNodes == 2) || ((numDataNodes == 1) &&
526 	    (asmap->totalSectorsAccessed <
527 	     raidPtr->Layout.sectorsPerStripeUnit))) {
528 		func = pfuncs->simple;
529 		undoFunc = rf_NullNodeUndoFunc;
530 		name = pfuncs->SimpleName;
531 		if (qfuncs) {
532 			qfunc = qfuncs->simple;
533 			qname = qfuncs->SimpleName;
534 		}
535 	} else {
536 		func = pfuncs->regular;
537 		undoFunc = rf_NullNodeUndoFunc;
538 		name = pfuncs->RegularName;
539 		if (qfuncs) {
540 			qfunc = qfuncs->regular;
541 			qname = qfuncs->RegularName;
542 		}
543 	}
544 	/*
545 	 * Initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
546 	 * nodes, and raidPtr.
547 	 */
548 	if (numParityNodes == 2) {	/* Double-XOR case. */
549 		for (i = 0; i < numParityNodes; i++) {
550 			rf_InitNode(&xorNodes[i], rf_wait, RF_TRUE, func,
551 			    undoFunc, NULL, 1, nNodes, 7, 1, dag_h, name,
552 			    allocList);	/* No wakeup func for XOR. */
553 			xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
554 			xorNodes[i].params[0] = readDataNodes[i].params[0];
555 			xorNodes[i].params[1] = readDataNodes[i].params[1];
556 			xorNodes[i].params[2] = readParityNodes[i].params[0];
557 			xorNodes[i].params[3] = readParityNodes[i].params[1];
558 			xorNodes[i].params[4] = writeDataNodes[i].params[0];
559 			xorNodes[i].params[5] = writeDataNodes[i].params[1];
560 			xorNodes[i].params[6].p = raidPtr;
561 			/* Use old parity buf as target buf. */
562 			xorNodes[i].results[0] = readParityNodes[i].params[1].p;
563 		}
564 	} else {
565 		/* There is only one xor node in this case. */
566 		rf_InitNode(&xorNodes[0], rf_wait, RF_TRUE, func, undoFunc,
567 		    NULL, 1, nNodes,
568 		    (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
569 		    dag_h, name, allocList);
570 		xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
571 		for (i = 0; i < numDataNodes + 1; i++) {
572 			/* Set up params related to Rod and Rop nodes. */
573 			xorNodes[0].params[2 * i + 0] =
574 			    readDataNodes[i].params[0];	/* pda */
575 			xorNodes[0].params[2 * i + 1] =
576 			    readDataNodes[i].params[1];	/* Buffer pointer */
577 		}
578 		for (i = 0; i < numDataNodes; i++) {
579 			/* Set up params related to Wnd and Wnp nodes. */
580 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
581 			    writeDataNodes[i].params[0]; /* pda */
582 			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
583 			    writeDataNodes[i].params[1]; /* Buffer pointer */
584 		}
585 		xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
586 		    raidPtr;	/* Xor node needs to get at RAID information. */
587 		xorNodes[0].results[0] = readParityNodes[0].params[1].p;
588 	}
589 
590 	/* Initialize the log node(s). */
591 	pda = asmap->parityInfo;
592 	for (i = 0; i < numParityNodes; i++) {
593 		RF_ASSERT(pda);
594 		rf_InitNode(&lpuNodes[i], rf_wait, RF_FALSE,
595 		    rf_ParityLogUpdateFunc, rf_ParityLogUpdateUndoFunc,
596 		    rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpu", allocList);
597 		lpuNodes[i].params[0].p = pda;	/* PhysDiskAddr of parity. */
598 		/* Buffer pointer to parity. */
599 		lpuNodes[i].params[1].p = xorNodes[i].results[0];
600 		pda = pda->next;
601 	}
602 
603 
604 	/* Step 4. Connect the nodes. */
605 
606 	/* Connect header to block node. */
607 	RF_ASSERT(dag_h->numSuccedents == 1);
608 	RF_ASSERT(blockNode->numAntecedents == 0);
609 	dag_h->succedents[0] = blockNode;
610 
611 	/* Connect block node to read old data nodes. */
612 	RF_ASSERT(blockNode->numSuccedents == (numDataNodes + numParityNodes));
613 	for (i = 0; i < numDataNodes; i++) {
614 		blockNode->succedents[i] = &readDataNodes[i];
615 		RF_ASSERT(readDataNodes[i].numAntecedents == 1);
616 		readDataNodes[i].antecedents[0] = blockNode;
617 		readDataNodes[i].antType[0] = rf_control;
618 	}
619 
620 	/* Connect block node to read old parity nodes. */
621 	for (i = 0; i < numParityNodes; i++) {
622 		blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
623 		RF_ASSERT(readParityNodes[i].numAntecedents == 1);
624 		readParityNodes[i].antecedents[0] = blockNode;
625 		readParityNodes[i].antType[0] = rf_control;
626 	}
627 
628 	/* Connect read old data nodes to write new data nodes. */
629 	for (i = 0; i < numDataNodes; i++) {
630 		RF_ASSERT(readDataNodes[i].numSuccedents ==
631 		          numDataNodes + numParityNodes);
632 		for (j = 0; j < numDataNodes; j++) {
633 			RF_ASSERT(writeDataNodes[j].numAntecedents ==
634 			          numDataNodes + numParityNodes);
635 			readDataNodes[i].succedents[j] = &writeDataNodes[j];
636 			writeDataNodes[j].antecedents[i] = &readDataNodes[i];
637 			if (i == j)
638 				writeDataNodes[j].antType[i] = rf_antiData;
639 			else
640 				writeDataNodes[j].antType[i] = rf_control;
641 		}
642 	}
643 
644 	/* Connect read old data nodes to xor nodes. */
645 	for (i = 0; i < numDataNodes; i++)
646 		for (j = 0; j < numParityNodes; j++) {
647 			RF_ASSERT(xorNodes[j].numAntecedents ==
648 			          numDataNodes + numParityNodes);
649 			readDataNodes[i].succedents[numDataNodes + j] =
650 			    &xorNodes[j];
651 			xorNodes[j].antecedents[i] = &readDataNodes[i];
652 			xorNodes[j].antType[i] = rf_trueData;
653 		}
654 
655 	/* Connect read old parity nodes to write new data nodes. */
656 	for (i = 0; i < numParityNodes; i++) {
657 		RF_ASSERT(readParityNodes[i].numSuccedents ==
658 		          numDataNodes + numParityNodes);
659 		for (j = 0; j < numDataNodes; j++) {
660 			readParityNodes[i].succedents[j] = &writeDataNodes[j];
661 			writeDataNodes[j].antecedents[numDataNodes + i] =
662 			    &readParityNodes[i];
663 			writeDataNodes[j].antType[numDataNodes + i] =
664 			    rf_control;
665 		}
666 	}
667 
668 	/* Connect read old parity nodes to xor nodes. */
669 	for (i = 0; i < numParityNodes; i++)
670 		for (j = 0; j < numParityNodes; j++) {
671 			readParityNodes[i].succedents[numDataNodes + j] =
672 			    &xorNodes[j];
673 			xorNodes[j].antecedents[numDataNodes + i] =
674 			    &readParityNodes[i];
675 			xorNodes[j].antType[numDataNodes + i] = rf_trueData;
676 		}
677 
678 	/* Connect xor nodes to write new parity nodes. */
679 	for (i = 0; i < numParityNodes; i++) {
680 		RF_ASSERT(xorNodes[i].numSuccedents == 1);
681 		RF_ASSERT(lpuNodes[i].numAntecedents == 1);
682 		xorNodes[i].succedents[0] = &lpuNodes[i];
683 		lpuNodes[i].antecedents[0] = &xorNodes[i];
684 		lpuNodes[i].antType[0] = rf_trueData;
685 	}
686 
687 	for (i = 0; i < numDataNodes; i++) {
688 		if (lu_flag) {
689 			/* Connect write new data nodes to unlock nodes. */
690 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
691 			RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
692 			writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
693 			unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
694 			unlockDataNodes[i].antType[0] = rf_control;
695 
696 			/* Connect unlock nodes to unblock node. */
697 			RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
698 			RF_ASSERT(unblockNode->numAntecedents ==
699 			          (numDataNodes + (nfaults * numParityNodes)));
700 			unlockDataNodes[i].succedents[0] = unblockNode;
701 			unblockNode->antecedents[i] = &unlockDataNodes[i];
702 			unblockNode->antType[i] = rf_control;
703 		} else {
704 			/* Connect write new data nodes to unblock node. */
705 			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
706 			RF_ASSERT(unblockNode->numAntecedents ==
707 			          (numDataNodes + (nfaults * numParityNodes)));
708 			writeDataNodes[i].succedents[0] = unblockNode;
709 			unblockNode->antecedents[i] = &writeDataNodes[i];
710 			unblockNode->antType[i] = rf_control;
711 		}
712 	}
713 
714 	/* Connect write new parity nodes to unblock node. */
715 	for (i = 0; i < numParityNodes; i++) {
716 		RF_ASSERT(lpuNodes[i].numSuccedents == 1);
717 		lpuNodes[i].succedents[0] = unblockNode;
718 		unblockNode->antecedents[numDataNodes + i] = &lpuNodes[i];
719 		unblockNode->antType[numDataNodes + i] = rf_control;
720 	}
721 
722 	/* Connect unblock node to terminator. */
723 	RF_ASSERT(unblockNode->numSuccedents == 1);
724 	RF_ASSERT(termNode->numAntecedents == 1);
725 	RF_ASSERT(termNode->numSuccedents == 0);
726 	unblockNode->succedents[0] = termNode;
727 	termNode->antecedents[0] = unblockNode;
728 	termNode->antType[0] = rf_control;
729 }
730 
731 
732 void
rf_CreateParityLoggingSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)733 rf_CreateParityLoggingSmallWriteDAG(RF_Raid_t *raidPtr,
734     RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp,
735     RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList,
736     RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
737 {
738 	dag_h->creator = "ParityLoggingSmallWriteDAG";
739 	rf_CommonCreateParityLoggingSmallWriteDAG(raidPtr, asmap, dag_h, bp,
740 	    flags, allocList, &rf_xorFuncs, NULL);
741 }
742 
743 
744 void
rf_CreateParityLoggingLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *))745 rf_CreateParityLoggingLargeWriteDAG(RF_Raid_t *raidPtr,
746     RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp,
747     RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList, int nfaults,
748     int (*redFunc) (RF_DagNode_t *))
749 {
750 	dag_h->creator = "ParityLoggingSmallWriteDAG";
751 	rf_CommonCreateParityLoggingLargeWriteDAG(raidPtr, asmap, dag_h, bp,
752 	    flags, allocList, 1, rf_RegularXorFunc);
753 }
754 #endif	/* RF_INCLUDE_PARITYLOGGING > 0 */
755