1 /* $OpenBSD: rf_dagffwr.c,v 1.5 2002/12/16 07:01:03 tdeval Exp $ */
2 /* $NetBSD: rf_dagffwr.c,v 1.5 2000/01/07 03:40:58 oster Exp $ */
3
4 /*
5 * Copyright (c) 1995 Carnegie-Mellon University.
6 * All rights reserved.
7 *
8 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
9 *
10 * Permission to use, copy, modify and distribute this software and
11 * its documentation is hereby granted, provided that both the copyright
12 * notice and this permission notice appear in all copies of the
13 * software, derivative works or modified versions, and any portions
14 * thereof, and that both notices appear in supporting documentation.
15 *
16 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19 *
20 * Carnegie Mellon requests users of this software to return to
21 *
22 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
23 * School of Computer Science
24 * Carnegie Mellon University
25 * Pittsburgh PA 15213-3890
26 *
27 * any improvements or extensions that they make and grant Carnegie the
28 * rights to redistribute these changes.
29 */
30
31 /*
32 * rf_dagff.c
33 *
34 * Code for creating fault-free DAGs.
35 *
36 */
37
38 #include "rf_types.h"
39 #include "rf_raid.h"
40 #include "rf_dag.h"
41 #include "rf_dagutils.h"
42 #include "rf_dagfuncs.h"
43 #include "rf_debugMem.h"
44 #include "rf_dagffrd.h"
45 #include "rf_memchunk.h"
46 #include "rf_general.h"
47 #include "rf_dagffwr.h"
48
49 /*****************************************************************************
50 *
51 * General comments on DAG creation:
52 *
53 * All DAGs in this file use roll-away error recovery. Each DAG has a single
54 * commit node, usually called "Cmt." If an error occurs before the Cmt node
55 * is reached, the execution engine will halt forward execution and work
56 * backward through the graph, executing the undo functions. Assuming that
57 * each node in the graph prior to the Cmt node are undoable and atomic - or -
58 * does not make changes to permanent state, the graph will fail atomically.
59 * If an error occurs after the Cmt node executes, the engine will roll-forward
60 * through the graph, blindly executing nodes until it reaches the end.
61 * If a graph reaches the end, it is assumed to have completed successfully.
62 *
63 * A graph has only 1 Cmt node.
64 *
65 *****************************************************************************/
66
67
68 /*****************************************************************************
69 *
70 * The following wrappers map the standard DAG creation interface to the
71 * DAG creation routines. Additionally, these wrappers enable experimentation
72 * with new DAG structures by providing an extra level of indirection, allowing
73 * the DAG creation routines to be replaced at this single point.
74 *
75 *****************************************************************************/
76
77
78 void
rf_CreateNonRedundantWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_IoType_t type)79 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
81 RF_AllocListElem_t *allocList, RF_IoType_t type)
82 {
83 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
84 RF_IO_TYPE_WRITE);
85 }
86
87 void
rf_CreateRAID0WriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_IoType_t type)88 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
89 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
90 RF_AllocListElem_t *allocList, RF_IoType_t type)
91 {
92 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
93 RF_IO_TYPE_WRITE);
94 }
95
96 void
rf_CreateSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)97 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
98 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
99 RF_AllocListElem_t *allocList)
100 {
101 /* "normal" rollaway. */
102 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
103 allocList, &rf_xorFuncs, NULL);
104 }
105
106 void
rf_CreateLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)107 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
108 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
109 RF_AllocListElem_t *allocList)
110 {
111 /* "normal" rollaway. */
112 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
113 allocList, 1, rf_RegularXorFunc, RF_TRUE);
114 }
115
116
117 /*****************************************************************************
118 *
119 * DAG creation code begins here.
120 *
121 *****************************************************************************/
122
123
124 /*****************************************************************************
125 *
126 * creates a DAG to perform a large-write operation:
127 *
128 * / Rod \ / Wnd \
129 * H -- block- Rod - Xor - Cmt - Wnd --- T
130 * \ Rod / \ Wnp /
131 * \[Wnq]/
132 *
133 * The XOR node also does the Q calculation in the P+Q architecture.
134 * All nodes that are before the commit node (Cmt) are assumed to be atomic
135 * and undoable - or - they make no changes to permanent state.
136 *
137 * Rod = read old data
138 * Cmt = commit node
139 * Wnp = write new parity
140 * Wnd = write new data
141 * Wnq = write new "q"
142 * [] denotes optional segments in the graph.
143 *
144 * Parameters: raidPtr - description of the physical array
145 * asmap - logical & physical addresses for this access
146 * bp - buffer ptr (holds write data)
147 * flags - general flags (e.g. disk locking)
148 * allocList - list of memory allocated in DAG creation
149 * nfaults - number of faults array can tolerate
150 * (equal to # redundancy units in stripe)
151 * redfuncs - list of redundancy generating functions
152 *
153 *****************************************************************************/
154
155 void
rf_CommonCreateLargeWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *),int allowBufferRecycle)156 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
157 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
158 RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
159 int allowBufferRecycle)
160 {
161 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
162 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
163 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
164 RF_AccessStripeMapHeader_t *new_asm_h[2];
165 RF_StripeNum_t parityStripeID;
166 char *sosBuffer, *eosBuffer;
167 RF_ReconUnitNum_t which_ru;
168 RF_RaidLayout_t *layoutPtr;
169 RF_PhysDiskAddr_t *pda;
170
171 layoutPtr = &(raidPtr->Layout);
172 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
173 asmap->raidAddress, &which_ru);
174
175 if (rf_dagDebug) {
176 printf("[Creating large-write DAG]\n");
177 }
178 dag_h->creator = "LargeWriteDAG";
179
180 dag_h->numCommitNodes = 1;
181 dag_h->numCommits = 0;
182 dag_h->numSuccedents = 1;
183
184 /* Alloc the nodes: Wnd, xor, commit, block, term, and Wnp. */
185 nWndNodes = asmap->numStripeUnitsAccessed;
186 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
187 (RF_DagNode_t *), allocList);
188 i = 0;
189 wndNodes = &nodes[i];
190 i += nWndNodes;
191 xorNode = &nodes[i];
192 i += 1;
193 wnpNode = &nodes[i];
194 i += 1;
195 blockNode = &nodes[i];
196 i += 1;
197 commitNode = &nodes[i];
198 i += 1;
199 termNode = &nodes[i];
200 i += 1;
201 if (nfaults == 2) {
202 wnqNode = &nodes[i];
203 i += 1;
204 } else {
205 wnqNode = NULL;
206 }
207 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
208 new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
209 if (nRodNodes > 0) {
210 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
211 (RF_DagNode_t *), allocList);
212 } else {
213 rodNodes = NULL;
214 }
215
216 /* Begin node initialization. */
217 if (nRodNodes > 0) {
218 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
219 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
220 "Nil", allocList);
221 } else {
222 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
223 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
224 allocList);
225 }
226
227 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
228 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
229 dag_h, "Cmt", allocList);
230 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
231 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
232 dag_h, "Trm", allocList);
233
234 /* Initialize the Rod nodes. */
235 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
236 if (new_asm_h[asmNum]) {
237 pda = new_asm_h[asmNum]->stripeMap->physInfo;
238 while (pda) {
239 rf_InitNode(&rodNodes[nodeNum], rf_wait,
240 RF_FALSE, rf_DiskReadFunc,
241 rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
242 1, 1, 4, 0, dag_h, "Rod", allocList);
243 rodNodes[nodeNum].params[0].p = pda;
244 rodNodes[nodeNum].params[1].p = pda->bufPtr;
245 rodNodes[nodeNum].params[2].v = parityStripeID;
246 rodNodes[nodeNum].params[3].v =
247 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
248 0, 0, which_ru);
249 nodeNum++;
250 pda = pda->next;
251 }
252 }
253 }
254 RF_ASSERT(nodeNum == nRodNodes);
255
256 /* Initialize the wnd nodes. */
257 pda = asmap->physInfo;
258 for (i = 0; i < nWndNodes; i++) {
259 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
260 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
261 dag_h, "Wnd", allocList);
262 RF_ASSERT(pda != NULL);
263 wndNodes[i].params[0].p = pda;
264 wndNodes[i].params[1].p = pda->bufPtr;
265 wndNodes[i].params[2].v = parityStripeID;
266 wndNodes[i].params[3].v =
267 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
268 pda = pda->next;
269 }
270
271 /* Initialize the redundancy node. */
272 if (nRodNodes > 0) {
273 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
274 rf_NullNodeUndoFunc, NULL, 1, nRodNodes,
275 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
276 "Xr ", allocList);
277 } else {
278 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
279 rf_NullNodeUndoFunc, NULL, 1, 1,
280 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
281 "Xr ", allocList);
282 }
283 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
284 for (i = 0; i < nWndNodes; i++) {
285 xorNode->params[2 * i + 0] =
286 wndNodes[i].params[0]; /* pda */
287 xorNode->params[2 * i + 1] =
288 wndNodes[i].params[1]; /* buf ptr */
289 }
290 for (i = 0; i < nRodNodes; i++) {
291 xorNode->params[2 * (nWndNodes + i) + 0] =
292 rodNodes[i].params[0]; /* pda */
293 xorNode->params[2 * (nWndNodes + i) + 1] =
294 rodNodes[i].params[1]; /* buf ptr */
295 }
296 /* Xor node needs to get at RAID information. */
297 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
298
299 /*
300 * Look for an Rod node that reads a complete SU. If none, alloc
301 * a buffer to receive the parity info. Note that we can't use a
302 * new data buffer because it will not have gotten written when
303 * the xor occurs.
304 */
305 if (allowBufferRecycle) {
306 for (i = 0; i < nRodNodes; i++) {
307 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
308 ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
309 break;
310 }
311 }
312 if ((!allowBufferRecycle) || (i == nRodNodes)) {
313 RF_CallocAndAdd(xorNode->results[0], 1,
314 rf_RaidAddressToByte(raidPtr,
315 raidPtr->Layout.sectorsPerStripeUnit),
316 (void *), allocList);
317 } else {
318 xorNode->results[0] = rodNodes[i].params[1].p;
319 }
320
321 /* Initialize the Wnp node. */
322 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
323 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
324 dag_h, "Wnp", allocList);
325 wnpNode->params[0].p = asmap->parityInfo;
326 wnpNode->params[1].p = xorNode->results[0];
327 wnpNode->params[2].v = parityStripeID;
328 wnpNode->params[3].v =
329 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
330 /* parityInfo must describe entire parity unit. */
331 RF_ASSERT(asmap->parityInfo->next == NULL);
332
333 if (nfaults == 2) {
334 /*
335 * We never try to recycle a buffer for the Q calculation
336 * in addition to the parity. This would cause two buffers
337 * to get smashed during the P and Q calculation, guaranteeing
338 * one would be wrong.
339 */
340 RF_CallocAndAdd(xorNode->results[1], 1,
341 rf_RaidAddressToByte(raidPtr,
342 raidPtr->Layout.sectorsPerStripeUnit),
343 (void *), allocList);
344 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
345 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
346 dag_h, "Wnq", allocList);
347 wnqNode->params[0].p = asmap->qInfo;
348 wnqNode->params[1].p = xorNode->results[1];
349 wnqNode->params[2].v = parityStripeID;
350 wnqNode->params[3].v =
351 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
352 /* parityInfo must describe entire parity unit. */
353 RF_ASSERT(asmap->parityInfo->next == NULL);
354 }
355 /*
356 * Connect nodes to form graph.
357 */
358
359 /* Connect dag header to block node. */
360 RF_ASSERT(blockNode->numAntecedents == 0);
361 dag_h->succedents[0] = blockNode;
362
363 if (nRodNodes > 0) {
364 /* Connect the block node to the Rod nodes. */
365 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
366 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
367 for (i = 0; i < nRodNodes; i++) {
368 RF_ASSERT(rodNodes[i].numAntecedents == 1);
369 blockNode->succedents[i] = &rodNodes[i];
370 rodNodes[i].antecedents[0] = blockNode;
371 rodNodes[i].antType[0] = rf_control;
372
373 /* Connect the Rod nodes to the Xor node. */
374 RF_ASSERT(rodNodes[i].numSuccedents == 1);
375 rodNodes[i].succedents[0] = xorNode;
376 xorNode->antecedents[i] = &rodNodes[i];
377 xorNode->antType[i] = rf_trueData;
378 }
379 } else {
380 /* Connect the block node to the Xor node. */
381 RF_ASSERT(blockNode->numSuccedents == 1);
382 RF_ASSERT(xorNode->numAntecedents == 1);
383 blockNode->succedents[0] = xorNode;
384 xorNode->antecedents[0] = blockNode;
385 xorNode->antType[0] = rf_control;
386 }
387
388 /* Connect the xor node to the commit node. */
389 RF_ASSERT(xorNode->numSuccedents == 1);
390 RF_ASSERT(commitNode->numAntecedents == 1);
391 xorNode->succedents[0] = commitNode;
392 commitNode->antecedents[0] = xorNode;
393 commitNode->antType[0] = rf_control;
394
395 /* Connect the commit node to the write nodes. */
396 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
397 for (i = 0; i < nWndNodes; i++) {
398 RF_ASSERT(wndNodes->numAntecedents == 1);
399 commitNode->succedents[i] = &wndNodes[i];
400 wndNodes[i].antecedents[0] = commitNode;
401 wndNodes[i].antType[0] = rf_control;
402 }
403 RF_ASSERT(wnpNode->numAntecedents == 1);
404 commitNode->succedents[nWndNodes] = wnpNode;
405 wnpNode->antecedents[0] = commitNode;
406 wnpNode->antType[0] = rf_trueData;
407 if (nfaults == 2) {
408 RF_ASSERT(wnqNode->numAntecedents == 1);
409 commitNode->succedents[nWndNodes + 1] = wnqNode;
410 wnqNode->antecedents[0] = commitNode;
411 wnqNode->antType[0] = rf_trueData;
412 }
413 /* Connect the write nodes to the term node. */
414 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
415 RF_ASSERT(termNode->numSuccedents == 0);
416 for (i = 0; i < nWndNodes; i++) {
417 RF_ASSERT(wndNodes->numSuccedents == 1);
418 wndNodes[i].succedents[0] = termNode;
419 termNode->antecedents[i] = &wndNodes[i];
420 termNode->antType[i] = rf_control;
421 }
422 RF_ASSERT(wnpNode->numSuccedents == 1);
423 wnpNode->succedents[0] = termNode;
424 termNode->antecedents[nWndNodes] = wnpNode;
425 termNode->antType[nWndNodes] = rf_control;
426 if (nfaults == 2) {
427 RF_ASSERT(wnqNode->numSuccedents == 1);
428 wnqNode->succedents[0] = termNode;
429 termNode->antecedents[nWndNodes + 1] = wnqNode;
430 termNode->antType[nWndNodes + 1] = rf_control;
431 }
432 }
433 /*****************************************************************************
434 *
435 * Create a DAG to perform a small-write operation (either raid 5 or pq),
436 * which is as follows:
437 *
438 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
439 * \- Rod X / \----> Wnd [Und]-/
440 * [\- Rod X / \---> Wnd [Und]-/]
441 * [\- Roq -> Q / \--> Wnq [Unq]-/]
442 *
443 * Rop = read old parity
444 * Rod = read old data
445 * Roq = read old "q"
446 * Cmt = commit node
447 * Und = unlock data disk
448 * Unp = unlock parity disk
449 * Unq = unlock q disk
450 * Wnp = write new parity
451 * Wnd = write new data
452 * Wnq = write new "q"
453 * [ ] denotes optional segments in the graph.
454 *
455 * Parameters: raidPtr - description of the physical array
456 * asmap - logical & physical addresses for this access
457 * bp - buffer ptr (holds write data)
458 * flags - general flags (e.g. disk locking)
459 * allocList - list of memory allocated in DAG creation
460 * pfuncs - list of parity generating functions
461 * qfuncs - list of q generating functions
462 *
463 * A null qfuncs indicates single fault tolerant.
464 *****************************************************************************/
465
466 void
rf_CommonCreateSmallWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)467 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
468 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
469 RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
470 {
471 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
472 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
473 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
474 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
475 int i, j, nNodes, totalNumNodes, lu_flag;
476 RF_ReconUnitNum_t which_ru;
477 int (*func) (RF_DagNode_t *);
478 int (*undoFunc) (RF_DagNode_t *);
479 int (*qfunc) (RF_DagNode_t *);
480 int numDataNodes, numParityNodes;
481 RF_StripeNum_t parityStripeID;
482 RF_PhysDiskAddr_t *pda;
483 char *name, *qname;
484 long nfaults;
485
486 nfaults = qfuncs ? 2 : 1;
487 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* Lock/unlock flag. */
488
489 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
490 asmap->raidAddress, &which_ru);
491 pda = asmap->physInfo;
492 numDataNodes = asmap->numStripeUnitsAccessed;
493 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
494
495 if (rf_dagDebug) {
496 printf("[Creating small-write DAG]\n");
497 }
498 RF_ASSERT(numDataNodes > 0);
499 dag_h->creator = "SmallWriteDAG";
500
501 dag_h->numCommitNodes = 1;
502 dag_h->numCommits = 0;
503 dag_h->numSuccedents = 1;
504
505 /*
506 * DAG creation occurs in four steps:
507 * 1. Count the number of nodes in the DAG.
508 * 2. Create the nodes.
509 * 3. Initialize the nodes.
510 * 4. Connect the nodes.
511 */
512
513 /*
514 * Step 1. Compute number of nodes in the graph.
515 */
516
517 /*
518 * Number of nodes: a read and write for each data unit, a redundancy
519 * computation node for each parity node (nfaults * nparity), a read
520 * and write for each parity unit, a block and commit node (2), a
521 * terminate node if atomic RMW, an unlock node for each
522 * data/redundancy unit.
523 */
524 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
525 + (nfaults * 2 * numParityNodes) + 3;
526 if (lu_flag) {
527 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
528 }
529 /*
530 * Step 2. Create the nodes.
531 */
532 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
533 (RF_DagNode_t *), allocList);
534 i = 0;
535 blockNode = &nodes[i];
536 i += 1;
537 commitNode = &nodes[i];
538 i += 1;
539 readDataNodes = &nodes[i];
540 i += numDataNodes;
541 readParityNodes = &nodes[i];
542 i += numParityNodes;
543 writeDataNodes = &nodes[i];
544 i += numDataNodes;
545 writeParityNodes = &nodes[i];
546 i += numParityNodes;
547 xorNodes = &nodes[i];
548 i += numParityNodes;
549 termNode = &nodes[i];
550 i += 1;
551 if (lu_flag) {
552 unlockDataNodes = &nodes[i];
553 i += numDataNodes;
554 unlockParityNodes = &nodes[i];
555 i += numParityNodes;
556 } else {
557 unlockDataNodes = unlockParityNodes = NULL;
558 }
559 if (nfaults == 2) {
560 readQNodes = &nodes[i];
561 i += numParityNodes;
562 writeQNodes = &nodes[i];
563 i += numParityNodes;
564 qNodes = &nodes[i];
565 i += numParityNodes;
566 if (lu_flag) {
567 unlockQNodes = &nodes[i];
568 i += numParityNodes;
569 } else {
570 unlockQNodes = NULL;
571 }
572 } else {
573 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
574 }
575 RF_ASSERT(i == totalNumNodes);
576
577 /*
578 * Step 3. Initialize the nodes.
579 */
580 /* Initialize block node (Nil). */
581 nNodes = numDataNodes + (nfaults * numParityNodes);
582 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
583 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
584 "Nil", allocList);
585
586 /* Initialize commit node (Cmt). */
587 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
588 rf_NullNodeUndoFunc, NULL, nNodes, (nfaults * numParityNodes),
589 0, 0, dag_h, "Cmt", allocList);
590
591 /* Initialize terminate node (Trm). */
592 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
593 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
594 "Trm", allocList);
595
596 /* Initialize nodes which read old data (Rod). */
597 for (i = 0; i < numDataNodes; i++) {
598 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
599 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
600 (nfaults * numParityNodes), 1, 4, 0, dag_h, "Rod",
601 allocList);
602 RF_ASSERT(pda != NULL);
603 /* Physical disk addr desc. */
604 readDataNodes[i].params[0].p = pda;
605 /* Buffer to hold old data. */
606 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607 dag_h, pda, allocList);
608 readDataNodes[i].params[2].v = parityStripeID;
609 readDataNodes[i].params[3].v =
610 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
611 lu_flag, 0, which_ru);
612 pda = pda->next;
613 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
614 readDataNodes[i].propList[j] = NULL;
615 }
616 }
617
618 /* Initialize nodes which read old parity (Rop). */
619 pda = asmap->parityInfo;
620 i = 0;
621 for (i = 0; i < numParityNodes; i++) {
622 RF_ASSERT(pda != NULL);
623 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
624 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
625 numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
626 readParityNodes[i].params[0].p = pda;
627 /* Buffer to hold old parity. */
628 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
629 dag_h, pda, allocList);
630 readParityNodes[i].params[2].v = parityStripeID;
631 readParityNodes[i].params[3].v =
632 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
633 lu_flag, 0, which_ru);
634 pda = pda->next;
635 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
636 readParityNodes[i].propList[0] = NULL;
637 }
638 }
639
640 /* Initialize nodes which read old Q (Roq). */
641 if (nfaults == 2) {
642 pda = asmap->qInfo;
643 for (i = 0; i < numParityNodes; i++) {
644 RF_ASSERT(pda != NULL);
645 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
646 rf_DiskReadFunc, rf_DiskReadUndoFunc,
647 rf_GenericWakeupFunc, numParityNodes,
648 1, 4, 0, dag_h, "Roq", allocList);
649 readQNodes[i].params[0].p = pda;
650 /* Buffer to hold old Q. */
651 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
652 dag_h, pda, allocList);
653 readQNodes[i].params[2].v = parityStripeID;
654 readQNodes[i].params[3].v =
655 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
656 lu_flag, 0, which_ru);
657 pda = pda->next;
658 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
659 readQNodes[i].propList[0] = NULL;
660 }
661 }
662 }
663 /* Initialize nodes which write new data (Wnd). */
664 pda = asmap->physInfo;
665 for (i = 0; i < numDataNodes; i++) {
666 RF_ASSERT(pda != NULL);
667 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
668 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
669 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
670 "Wnd", allocList);
671 /* Physical disk addr desc. */
672 writeDataNodes[i].params[0].p = pda;
673 /* Buffer holding new data to be written. */
674 writeDataNodes[i].params[1].p = pda->bufPtr;
675 writeDataNodes[i].params[2].v = parityStripeID;
676 writeDataNodes[i].params[3].v =
677 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
678 if (lu_flag) {
679 /* Initialize node to unlock the disk queue. */
680 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
681 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
682 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
683 "Und", allocList);
684 /* Physical disk addr desc. */
685 unlockDataNodes[i].params[0].p = pda;
686 unlockDataNodes[i].params[1].v =
687 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
688 0, lu_flag, which_ru);
689 }
690 pda = pda->next;
691 }
692
693 /*
694 * Initialize nodes which compute new parity and Q.
695 */
696 /*
697 * We use the simple XOR func in the double-XOR case, and when
698 * we're accessing only a portion of one stripe unit.
699 * The distinction between the two is that the regular XOR func
700 * assumes that the targbuf is a full SU in size, and examines
701 * the pda associated with the buffer to decide where within
702 * the buffer to XOR the data, whereas the simple XOR func just
703 * XORs the data into the start of the buffer.
704 */
705 if ((numParityNodes == 2) || ((numDataNodes == 1) &&
706 (asmap->totalSectorsAccessed <
707 raidPtr->Layout.sectorsPerStripeUnit))) {
708 func = pfuncs->simple;
709 undoFunc = rf_NullNodeUndoFunc;
710 name = pfuncs->SimpleName;
711 if (qfuncs) {
712 qfunc = qfuncs->simple;
713 qname = qfuncs->SimpleName;
714 } else {
715 qfunc = NULL;
716 qname = NULL;
717 }
718 } else {
719 func = pfuncs->regular;
720 undoFunc = rf_NullNodeUndoFunc;
721 name = pfuncs->RegularName;
722 if (qfuncs) {
723 qfunc = qfuncs->regular;
724 qname = qfuncs->RegularName;
725 } else {
726 qfunc = NULL;
727 qname = NULL;
728 }
729 }
730 /*
731 * Initialize the xor nodes: params are {pda,buf}.
732 * From {Rod,Wnd,Rop} nodes, and raidPtr.
733 */
734 if (numParityNodes == 2) {
735 /* Double-xor case. */
736 for (i = 0; i < numParityNodes; i++) {
737 /* Note: no wakeup func for xor. */
738 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
739 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
740 7, 1, dag_h, name, allocList);
741 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
742 xorNodes[i].params[0] = readDataNodes[i].params[0];
743 xorNodes[i].params[1] = readDataNodes[i].params[1];
744 xorNodes[i].params[2] = readParityNodes[i].params[0];
745 xorNodes[i].params[3] = readParityNodes[i].params[1];
746 xorNodes[i].params[4] = writeDataNodes[i].params[0];
747 xorNodes[i].params[5] = writeDataNodes[i].params[1];
748 xorNodes[i].params[6].p = raidPtr;
749 /* Use old parity buf as target buf. */
750 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
751 if (nfaults == 2) {
752 /* Note: no wakeup func for qor. */
753 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
754 qfunc, undoFunc, NULL, 1,
755 (numDataNodes + numParityNodes), 7, 1,
756 dag_h, qname, allocList);
757 qNodes[i].params[0] =
758 readDataNodes[i].params[0];
759 qNodes[i].params[1] =
760 readDataNodes[i].params[1];
761 qNodes[i].params[2] = readQNodes[i].params[0];
762 qNodes[i].params[3] = readQNodes[i].params[1];
763 qNodes[i].params[4] =
764 writeDataNodes[i].params[0];
765 qNodes[i].params[5] =
766 writeDataNodes[i].params[1];
767 qNodes[i].params[6].p = raidPtr;
768 /* Use old Q buf as target buf. */
769 qNodes[i].results[0] =
770 readQNodes[i].params[1].p;
771 }
772 }
773 } else {
774 /* There is only one xor node in this case. */
775 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
776 NULL, 1, (numDataNodes + numParityNodes),
777 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
778 dag_h, name, allocList);
779 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
780 for (i = 0; i < numDataNodes + 1; i++) {
781 /* Set up params related to Rod and Rop nodes. */
782 xorNodes[0].params[2 * i + 0] =
783 readDataNodes[i].params[0]; /* pda */
784 xorNodes[0].params[2 * i + 1] =
785 readDataNodes[i].params[1]; /* buffer ptr */
786 }
787 for (i = 0; i < numDataNodes; i++) {
788 /* Set up params related to Wnd and Wnp nodes. */
789 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
790 writeDataNodes[i].params[0]; /* pda */
791 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
792 writeDataNodes[i].params[1]; /* buffer ptr */
793 }
794 /* Xor node needs to get at RAID information. */
795 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
796 raidPtr;
797 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
798 if (nfaults == 2) {
799 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
800 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
801 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
802 dag_h, qname, allocList);
803 for (i = 0; i < numDataNodes; i++) {
804 /* Set up params related to Rod. */
805 qNodes[0].params[2 * i + 0] =
806 readDataNodes[i].params[0]; /* pda */
807 qNodes[0].params[2 * i + 1] =
808 readDataNodes[i].params[1]; /* buffer ptr */
809 }
810 /* And read old q. */
811 qNodes[0].params[2 * numDataNodes + 0] =
812 readQNodes[0].params[0]; /* pda */
813 qNodes[0].params[2 * numDataNodes + 1] =
814 readQNodes[0].params[1]; /* buffer ptr */
815 for (i = 0; i < numDataNodes; i++) {
816 /* Set up params related to Wnd nodes. */
817 qNodes[0].params
818 [2 * (numDataNodes + 1 + i) + 0] =
819 /* pda */
820 writeDataNodes[i].params[0];
821 qNodes[0].params
822 [2 * (numDataNodes + 1 + i) + 1] =
823 /* buffer ptr */
824 writeDataNodes[i].params[1];
825 }
826 /* Xor node needs to get at RAID information. */
827 qNodes[0].params
828 [2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
829 qNodes[0].results[0] = readQNodes[0].params[1].p;
830 }
831 }
832
833 /* Initialize nodes which write new parity (Wnp). */
834 pda = asmap->parityInfo;
835 for (i = 0; i < numParityNodes; i++) {
836 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
837 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
838 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
839 "Wnp", allocList);
840 RF_ASSERT(pda != NULL);
841 /* Param 1 (bufPtr) filled in by xor node. */
842 writeParityNodes[i].params[0].p = pda;
843 /* Buffer pointer for parity write operation. */
844 writeParityNodes[i].params[1].p = xorNodes[i].results[0];
845 writeParityNodes[i].params[2].v = parityStripeID;
846 writeParityNodes[i].params[3].v =
847 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
848 if (lu_flag) {
849 /* Initialize node to unlock the disk queue. */
850 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
851 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
852 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
853 "Unp", allocList);
854 /* Physical disk addr desc. */
855 unlockParityNodes[i].params[0].p = pda;
856 unlockParityNodes[i].params[1].v =
857 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
858 0, lu_flag, which_ru);
859 }
860 pda = pda->next;
861 }
862
863 /* Initialize nodes which write new Q (Wnq). */
864 if (nfaults == 2) {
865 pda = asmap->qInfo;
866 for (i = 0; i < numParityNodes; i++) {
867 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
868 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
869 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
870 "Wnq", allocList);
871 RF_ASSERT(pda != NULL);
872 /* Param 1 (bufPtr) filled in by xor node. */
873 writeQNodes[i].params[0].p = pda;
874 writeQNodes[i].params[1].p = qNodes[i].results[0];
875 /* Buffer pointer for parity write operation. */
876 writeQNodes[i].params[2].v = parityStripeID;
877 writeQNodes[i].params[3].v =
878 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
879 0, 0, which_ru);
880 if (lu_flag) {
881 /* Initialize node to unlock the disk queue. */
882 rf_InitNode(&unlockQNodes[i], rf_wait,
883 RF_FALSE, rf_DiskUnlockFunc,
884 rf_DiskUnlockUndoFunc,
885 rf_GenericWakeupFunc, 1, 1, 2, 0,
886 dag_h, "Unq", allocList);
887 /* Physical disk addr desc. */
888 unlockQNodes[i].params[0].p = pda;
889 unlockQNodes[i].params[1].v =
890 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
891 0, lu_flag, which_ru);
892 }
893 pda = pda->next;
894 }
895 }
896 /*
897 * Step 4. Connect the nodes.
898 */
899
900 /* Connect header to block node. */
901 dag_h->succedents[0] = blockNode;
902
903 /* Connect block node to read old data nodes. */
904 RF_ASSERT(blockNode->numSuccedents ==
905 (numDataNodes + (numParityNodes * nfaults)));
906 for (i = 0; i < numDataNodes; i++) {
907 blockNode->succedents[i] = &readDataNodes[i];
908 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
909 readDataNodes[i].antecedents[0] = blockNode;
910 readDataNodes[i].antType[0] = rf_control;
911 }
912
913 /* Connect block node to read old parity nodes. */
914 for (i = 0; i < numParityNodes; i++) {
915 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
916 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
917 readParityNodes[i].antecedents[0] = blockNode;
918 readParityNodes[i].antType[0] = rf_control;
919 }
920
921 /* Connect block node to read old Q nodes. */
922 if (nfaults == 2) {
923 for (i = 0; i < numParityNodes; i++) {
924 blockNode->succedents[numDataNodes + numParityNodes + i]
925 = &readQNodes[i];
926 RF_ASSERT(readQNodes[i].numAntecedents == 1);
927 readQNodes[i].antecedents[0] = blockNode;
928 readQNodes[i].antType[0] = rf_control;
929 }
930 }
931 /* Connect read old data nodes to xor nodes. */
932 for (i = 0; i < numDataNodes; i++) {
933 RF_ASSERT(readDataNodes[i].numSuccedents ==
934 (nfaults * numParityNodes));
935 for (j = 0; j < numParityNodes; j++) {
936 RF_ASSERT(xorNodes[j].numAntecedents ==
937 numDataNodes + numParityNodes);
938 readDataNodes[i].succedents[j] = &xorNodes[j];
939 xorNodes[j].antecedents[i] = &readDataNodes[i];
940 xorNodes[j].antType[i] = rf_trueData;
941 }
942 }
943
944 /* Connect read old data nodes to q nodes. */
945 if (nfaults == 2) {
946 for (i = 0; i < numDataNodes; i++) {
947 for (j = 0; j < numParityNodes; j++) {
948 RF_ASSERT(qNodes[j].numAntecedents ==
949 numDataNodes + numParityNodes);
950 readDataNodes[i].succedents[numParityNodes + j]
951 = &qNodes[j];
952 qNodes[j].antecedents[i] = &readDataNodes[i];
953 qNodes[j].antType[i] = rf_trueData;
954 }
955 }
956 }
957 /* Connect read old parity nodes to xor nodes. */
958 for (i = 0; i < numParityNodes; i++) {
959 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
960 for (j = 0; j < numParityNodes; j++) {
961 readParityNodes[i].succedents[j] = &xorNodes[j];
962 xorNodes[j].antecedents[numDataNodes + i] =
963 &readParityNodes[i];
964 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
965 }
966 }
967
968 /* Connect read old q nodes to q nodes. */
969 if (nfaults == 2) {
970 for (i = 0; i < numParityNodes; i++) {
971 RF_ASSERT(readParityNodes[i].numSuccedents ==
972 numParityNodes);
973 for (j = 0; j < numParityNodes; j++) {
974 readQNodes[i].succedents[j] = &qNodes[j];
975 qNodes[j].antecedents[numDataNodes + i] =
976 &readQNodes[i];
977 qNodes[j].antType[numDataNodes + i] =
978 rf_trueData;
979 }
980 }
981 }
982 /* Connect xor nodes to commit node. */
983 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
984 for (i = 0; i < numParityNodes; i++) {
985 RF_ASSERT(xorNodes[i].numSuccedents == 1);
986 xorNodes[i].succedents[0] = commitNode;
987 commitNode->antecedents[i] = &xorNodes[i];
988 commitNode->antType[i] = rf_control;
989 }
990
991 /* Connect q nodes to commit node. */
992 if (nfaults == 2) {
993 for (i = 0; i < numParityNodes; i++) {
994 RF_ASSERT(qNodes[i].numSuccedents == 1);
995 qNodes[i].succedents[0] = commitNode;
996 commitNode->antecedents[i + numParityNodes] =
997 &qNodes[i];
998 commitNode->antType[i + numParityNodes] = rf_control;
999 }
1000 }
1001 /* Connect commit node to write nodes. */
1002 RF_ASSERT(commitNode->numSuccedents ==
1003 (numDataNodes + (nfaults * numParityNodes)));
1004 for (i = 0; i < numDataNodes; i++) {
1005 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1006 commitNode->succedents[i] = &writeDataNodes[i];
1007 writeDataNodes[i].antecedents[0] = commitNode;
1008 writeDataNodes[i].antType[0] = rf_trueData;
1009 }
1010 for (i = 0; i < numParityNodes; i++) {
1011 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1012 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1013 writeParityNodes[i].antecedents[0] = commitNode;
1014 writeParityNodes[i].antType[0] = rf_trueData;
1015 }
1016 if (nfaults == 2) {
1017 for (i = 0; i < numParityNodes; i++) {
1018 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1019 commitNode->succedents
1020 [i + numDataNodes + numParityNodes] =
1021 &writeQNodes[i];
1022 writeQNodes[i].antecedents[0] = commitNode;
1023 writeQNodes[i].antType[0] = rf_trueData;
1024 }
1025 }
1026 RF_ASSERT(termNode->numAntecedents ==
1027 (numDataNodes + (nfaults * numParityNodes)));
1028 RF_ASSERT(termNode->numSuccedents == 0);
1029 for (i = 0; i < numDataNodes; i++) {
1030 if (lu_flag) {
1031 /* Connect write new data nodes to unlock nodes. */
1032 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1033 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1034 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1035 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1036 unlockDataNodes[i].antType[0] = rf_control;
1037
1038 /* Connect unlock nodes to term node. */
1039 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1040 unlockDataNodes[i].succedents[0] = termNode;
1041 termNode->antecedents[i] = &unlockDataNodes[i];
1042 termNode->antType[i] = rf_control;
1043 } else {
1044 /* Connect write new data nodes to term node. */
1045 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1046 RF_ASSERT(termNode->numAntecedents ==
1047 (numDataNodes + (nfaults * numParityNodes)));
1048 writeDataNodes[i].succedents[0] = termNode;
1049 termNode->antecedents[i] = &writeDataNodes[i];
1050 termNode->antType[i] = rf_control;
1051 }
1052 }
1053
1054 for (i = 0; i < numParityNodes; i++) {
1055 if (lu_flag) {
1056 /* Connect write new parity nodes to unlock nodes. */
1057 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1058 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1059 writeParityNodes[i].succedents[0] =
1060 &unlockParityNodes[i];
1061 unlockParityNodes[i].antecedents[0] =
1062 &writeParityNodes[i];
1063 unlockParityNodes[i].antType[0] = rf_control;
1064
1065 /* Connect unlock nodes to term node. */
1066 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1067 unlockParityNodes[i].succedents[0] = termNode;
1068 termNode->antecedents[numDataNodes + i] =
1069 &unlockParityNodes[i];
1070 termNode->antType[numDataNodes + i] = rf_control;
1071 } else {
1072 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1073 writeParityNodes[i].succedents[0] = termNode;
1074 termNode->antecedents[numDataNodes + i] =
1075 &writeParityNodes[i];
1076 termNode->antType[numDataNodes + i] = rf_control;
1077 }
1078 }
1079
1080 if (nfaults == 2) {
1081 for (i = 0; i < numParityNodes; i++) {
1082 if (lu_flag) {
1083 /* Connect write new Q nodes to unlock nodes. */
1084 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1085 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1086 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1087 unlockQNodes[i].antecedents[0] =
1088 &writeQNodes[i];
1089 unlockQNodes[i].antType[0] = rf_control;
1090
1091 /* Connect unlock nodes to unblock node. */
1092 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1093 unlockQNodes[i].succedents[0] = termNode;
1094 termNode->antecedents
1095 [numDataNodes + numParityNodes + i] =
1096 &unlockQNodes[i];
1097 termNode->antType
1098 [numDataNodes + numParityNodes + i] =
1099 rf_control;
1100 } else {
1101 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1102 writeQNodes[i].succedents[0] = termNode;
1103 termNode->antecedents
1104 [numDataNodes + numParityNodes + i] =
1105 &writeQNodes[i];
1106 termNode->antType
1107 [numDataNodes + numParityNodes + i] =
1108 rf_control;
1109 }
1110 }
1111 }
1112 }
1113
1114
1115 /*****************************************************************************
1116 * Create a write graph (fault-free or degraded) for RAID level 1.
1117 *
1118 * Hdr -> Commit -> Wpd -> Nil -> Trm
1119 * -> Wsd ->
1120 *
1121 * The "Wpd" node writes data to the primary copy in the mirror pair.
1122 * The "Wsd" node writes data to the secondary copy in the mirror pair.
1123 *
1124 * Parameters: raidPtr - description of the physical array
1125 * asmap - logical & physical addresses for this access
1126 * bp - buffer ptr (holds write data)
1127 * flags - general flags (e.g. disk locking)
1128 * allocList - list of memory allocated in DAG creation
1129 *****************************************************************************/
1130
1131 void
rf_CreateRaidOneWriteDAG(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)1132 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1133 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1134 RF_AllocListElem_t *allocList)
1135 {
1136 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1137 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1138 int nWndNodes, nWmirNodes, i;
1139 RF_ReconUnitNum_t which_ru;
1140 RF_PhysDiskAddr_t *pda, *pdaP;
1141 RF_StripeNum_t parityStripeID;
1142
1143 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1144 asmap->raidAddress, &which_ru);
1145 if (rf_dagDebug) {
1146 printf("[Creating RAID level 1 write DAG]\n");
1147 }
1148 dag_h->creator = "RaidOneWriteDAG";
1149
1150 /* 2 implies access not SU aligned. */
1151 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1152 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1153
1154 /* Alloc the Wnd nodes and the Wmir node. */
1155 if (asmap->numDataFailed == 1)
1156 nWndNodes--;
1157 if (asmap->numParityFailed == 1)
1158 nWmirNodes--;
1159
1160 /*
1161 * Total number of nodes = nWndNodes + nWmirNodes
1162 * + (commit + unblock + terminator)
1163 */
1164 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1165 (RF_DagNode_t *), allocList);
1166 i = 0;
1167 wndNode = &nodes[i];
1168 i += nWndNodes;
1169 wmirNode = &nodes[i];
1170 i += nWmirNodes;
1171 commitNode = &nodes[i];
1172 i += 1;
1173 unblockNode = &nodes[i];
1174 i += 1;
1175 termNode = &nodes[i];
1176 i += 1;
1177 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1178
1179 /* This dag can commit immediately. */
1180 dag_h->numCommitNodes = 1;
1181 dag_h->numCommits = 0;
1182 dag_h->numSuccedents = 1;
1183
1184 /* Initialize the commit, unblock, and term nodes. */
1185 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1186 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0,
1187 dag_h, "Cmt", allocList);
1188 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1189 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0,
1190 dag_h, "Nil", allocList);
1191 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1192 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1193
1194 /* Initialize the wnd nodes. */
1195 if (nWndNodes > 0) {
1196 pda = asmap->physInfo;
1197 for (i = 0; i < nWndNodes; i++) {
1198 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1199 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1200 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1201 "Wpd", allocList);
1202 RF_ASSERT(pda != NULL);
1203 wndNode[i].params[0].p = pda;
1204 wndNode[i].params[1].p = pda->bufPtr;
1205 wndNode[i].params[2].v = parityStripeID;
1206 wndNode[i].params[3].v =
1207 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1208 0, 0, which_ru);
1209 pda = pda->next;
1210 }
1211 RF_ASSERT(pda == NULL);
1212 }
1213 /* Initialize the mirror nodes. */
1214 if (nWmirNodes > 0) {
1215 pda = asmap->physInfo;
1216 pdaP = asmap->parityInfo;
1217 for (i = 0; i < nWmirNodes; i++) {
1218 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1219 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1220 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1221 "Wsd", allocList);
1222 RF_ASSERT(pda != NULL);
1223 wmirNode[i].params[0].p = pdaP;
1224 wmirNode[i].params[1].p = pda->bufPtr;
1225 wmirNode[i].params[2].v = parityStripeID;
1226 wmirNode[i].params[3].v =
1227 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1228 0, 0, which_ru);
1229 pda = pda->next;
1230 pdaP = pdaP->next;
1231 }
1232 RF_ASSERT(pda == NULL);
1233 RF_ASSERT(pdaP == NULL);
1234 }
1235 /* Link the header node to the commit node. */
1236 RF_ASSERT(dag_h->numSuccedents == 1);
1237 RF_ASSERT(commitNode->numAntecedents == 0);
1238 dag_h->succedents[0] = commitNode;
1239
1240 /* Link the commit node to the write nodes. */
1241 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1242 for (i = 0; i < nWndNodes; i++) {
1243 RF_ASSERT(wndNode[i].numAntecedents == 1);
1244 commitNode->succedents[i] = &wndNode[i];
1245 wndNode[i].antecedents[0] = commitNode;
1246 wndNode[i].antType[0] = rf_control;
1247 }
1248 for (i = 0; i < nWmirNodes; i++) {
1249 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1250 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1251 wmirNode[i].antecedents[0] = commitNode;
1252 wmirNode[i].antType[0] = rf_control;
1253 }
1254
1255 /* Link the write nodes to the unblock node. */
1256 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1257 for (i = 0; i < nWndNodes; i++) {
1258 RF_ASSERT(wndNode[i].numSuccedents == 1);
1259 wndNode[i].succedents[0] = unblockNode;
1260 unblockNode->antecedents[i] = &wndNode[i];
1261 unblockNode->antType[i] = rf_control;
1262 }
1263 for (i = 0; i < nWmirNodes; i++) {
1264 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1265 wmirNode[i].succedents[0] = unblockNode;
1266 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1267 unblockNode->antType[i + nWndNodes] = rf_control;
1268 }
1269
1270 /* Link the unblock node to the term node. */
1271 RF_ASSERT(unblockNode->numSuccedents == 1);
1272 RF_ASSERT(termNode->numAntecedents == 1);
1273 RF_ASSERT(termNode->numSuccedents == 0);
1274 unblockNode->succedents[0] = termNode;
1275 termNode->antecedents[0] = unblockNode;
1276 termNode->antType[0] = rf_control;
1277 }
1278
1279
1280
1281 /*
1282 * DAGs that have no commit points.
1283 *
1284 * The following DAGs are used in forward and backward error recovery
1285 * experiments.
1286 * They are identical to the DAGs above this comment with the exception that
1287 * the commit points have been removed.
1288 */
1289
1290
1291 void
rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,int nfaults,int (* redFunc)(RF_DagNode_t *),int allowBufferRecycle)1292 rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1293 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1294 RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
1295 int allowBufferRecycle)
1296 {
1297 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1298 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1299 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1300 RF_AccessStripeMapHeader_t *new_asm_h[2];
1301 RF_StripeNum_t parityStripeID;
1302 char *sosBuffer, *eosBuffer;
1303 RF_ReconUnitNum_t which_ru;
1304 RF_RaidLayout_t *layoutPtr;
1305 RF_PhysDiskAddr_t *pda;
1306
1307 layoutPtr = &(raidPtr->Layout);
1308 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1309 asmap->raidAddress, &which_ru);
1310
1311 if (rf_dagDebug)
1312 printf("[Creating large-write DAG]\n");
1313 dag_h->creator = "LargeWriteDAGFwd";
1314
1315 dag_h->numCommitNodes = 0;
1316 dag_h->numCommits = 0;
1317 dag_h->numSuccedents = 1;
1318
1319 /* Alloc the nodes: Wnd, xor, commit, block, term, and Wnp. */
1320 nWndNodes = asmap->numStripeUnitsAccessed;
1321 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
1322 (RF_DagNode_t *), allocList);
1323 i = 0;
1324 wndNodes = &nodes[i];
1325 i += nWndNodes;
1326 xorNode = &nodes[i];
1327 i += 1;
1328 wnpNode = &nodes[i];
1329 i += 1;
1330 blockNode = &nodes[i];
1331 i += 1;
1332 syncNode = &nodes[i];
1333 i += 1;
1334 termNode = &nodes[i];
1335 i += 1;
1336 if (nfaults == 2) {
1337 wnqNode = &nodes[i];
1338 i += 1;
1339 } else {
1340 wnqNode = NULL;
1341 }
1342 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
1343 new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1344 if (nRodNodes > 0) {
1345 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
1346 (RF_DagNode_t *), allocList);
1347 } else {
1348 rodNodes = NULL;
1349 }
1350
1351 /* Begin node initialization. */
1352 if (nRodNodes > 0) {
1353 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1354 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
1355 "Nil", allocList);
1356 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1357 rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0,
1358 dag_h, "Nil", allocList);
1359 } else {
1360 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1361 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
1362 allocList);
1363 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1364 rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h,
1365 "Nil", allocList);
1366 }
1367
1368 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1369 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
1370 dag_h, "Trm", allocList);
1371
1372 /* Initialize the Rod nodes. */
1373 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1374 if (new_asm_h[asmNum]) {
1375 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1376 while (pda) {
1377 rf_InitNode(&rodNodes[nodeNum], rf_wait,
1378 RF_FALSE, rf_DiskReadFunc,
1379 rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1380 1, 1, 4, 0, dag_h, "Rod", allocList);
1381 rodNodes[nodeNum].params[0].p = pda;
1382 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1383 rodNodes[nodeNum].params[2].v = parityStripeID;
1384 rodNodes[nodeNum].params[3].v =
1385 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1386 0, 0, which_ru);
1387 nodeNum++;
1388 pda = pda->next;
1389 }
1390 }
1391 }
1392 RF_ASSERT(nodeNum == nRodNodes);
1393
1394 /* Initialize the wnd nodes. */
1395 pda = asmap->physInfo;
1396 for (i = 0; i < nWndNodes; i++) {
1397 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
1398 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1399 dag_h, "Wnd", allocList);
1400 RF_ASSERT(pda != NULL);
1401 wndNodes[i].params[0].p = pda;
1402 wndNodes[i].params[1].p = pda->bufPtr;
1403 wndNodes[i].params[2].v = parityStripeID;
1404 wndNodes[i].params[3].v =
1405 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1406 pda = pda->next;
1407 }
1408
1409 /* Initialize the redundancy node. */
1410 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc,
1411 NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
1412 "Xr ", allocList);
1413 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1414 for (i = 0; i < nWndNodes; i++) {
1415 xorNode->params[2 * i + 0] =
1416 wndNodes[i].params[0]; /* pda */
1417 xorNode->params[2 * i + 1] =
1418 wndNodes[i].params[1]; /* buf ptr */
1419 }
1420 for (i = 0; i < nRodNodes; i++) {
1421 xorNode->params[2 * (nWndNodes + i) + 0] =
1422 rodNodes[i].params[0]; /* pda */
1423 xorNode->params[2 * (nWndNodes + i) + 1] =
1424 rodNodes[i].params[1]; /* buf ptr */
1425 }
1426 /* Xor node needs to get at RAID information. */
1427 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
1428
1429 /*
1430 * Look for an Rod node that reads a complete SU. If none, alloc a
1431 * buffer to receive the parity info. Note that we can't use a new
1432 * data buffer because it will not have gotten written when the xor
1433 * occurs.
1434 */
1435 if (allowBufferRecycle) {
1436 for (i = 0; i < nRodNodes; i++)
1437 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
1438 ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1439 break;
1440 }
1441 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1442 RF_CallocAndAdd(xorNode->results[0], 1,
1443 rf_RaidAddressToByte(raidPtr,
1444 raidPtr->Layout.sectorsPerStripeUnit),
1445 (void *), allocList);
1446 } else
1447 xorNode->results[0] = rodNodes[i].params[1].p;
1448
1449 /* Initialize the Wnp node. */
1450 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1451 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1452 dag_h, "Wnp", allocList);
1453 wnpNode->params[0].p = asmap->parityInfo;
1454 wnpNode->params[1].p = xorNode->results[0];
1455 wnpNode->params[2].v = parityStripeID;
1456 wnpNode->params[3].v =
1457 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1458 /* parityInfo must describe entire parity unit. */
1459 RF_ASSERT(asmap->parityInfo->next == NULL);
1460
1461 if (nfaults == 2) {
1462 /*
1463 * Never try to recycle a buffer for the Q calcuation in
1464 * addition to the parity. This would cause two buffers to
1465 * get smashed during the P and Q calculation, guaranteeing
1466 * one would be wrong.
1467 */
1468 RF_CallocAndAdd(xorNode->results[1], 1,
1469 rf_RaidAddressToByte(raidPtr,
1470 raidPtr->Layout.sectorsPerStripeUnit),
1471 (void *), allocList);
1472 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1473 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1474 dag_h, "Wnq", allocList);
1475 wnqNode->params[0].p = asmap->qInfo;
1476 wnqNode->params[1].p = xorNode->results[1];
1477 wnqNode->params[2].v = parityStripeID;
1478 wnqNode->params[3].v =
1479 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1480 /* parityInfo must describe entire parity unit. */
1481 RF_ASSERT(asmap->parityInfo->next == NULL);
1482 }
1483
1484 /* Connect nodes to form graph. */
1485
1486 /* Connect dag header to block node. */
1487 RF_ASSERT(blockNode->numAntecedents == 0);
1488 dag_h->succedents[0] = blockNode;
1489
1490 if (nRodNodes > 0) {
1491 /* Connect the block node to the Rod nodes. */
1492 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1493 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1494 for (i = 0; i < nRodNodes; i++) {
1495 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1496 blockNode->succedents[i] = &rodNodes[i];
1497 rodNodes[i].antecedents[0] = blockNode;
1498 rodNodes[i].antType[0] = rf_control;
1499
1500 /* Connect the Rod nodes to the Nil node. */
1501 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1502 rodNodes[i].succedents[0] = syncNode;
1503 syncNode->antecedents[i] = &rodNodes[i];
1504 syncNode->antType[i] = rf_trueData;
1505 }
1506 } else {
1507 /* Connect the block node to the Nil node. */
1508 RF_ASSERT(blockNode->numSuccedents == 1);
1509 RF_ASSERT(syncNode->numAntecedents == 1);
1510 blockNode->succedents[0] = syncNode;
1511 syncNode->antecedents[0] = blockNode;
1512 syncNode->antType[0] = rf_control;
1513 }
1514
1515 /* Connect the sync node to the Wnd nodes. */
1516 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1517 for (i = 0; i < nWndNodes; i++) {
1518 RF_ASSERT(wndNodes->numAntecedents == 1);
1519 syncNode->succedents[i] = &wndNodes[i];
1520 wndNodes[i].antecedents[0] = syncNode;
1521 wndNodes[i].antType[0] = rf_control;
1522 }
1523
1524 /* Connect the sync node to the Xor node. */
1525 RF_ASSERT(xorNode->numAntecedents == 1);
1526 syncNode->succedents[nWndNodes] = xorNode;
1527 xorNode->antecedents[0] = syncNode;
1528 xorNode->antType[0] = rf_control;
1529
1530 /* Connect the xor node to the write parity node. */
1531 RF_ASSERT(xorNode->numSuccedents == nfaults);
1532 RF_ASSERT(wnpNode->numAntecedents == 1);
1533 xorNode->succedents[0] = wnpNode;
1534 wnpNode->antecedents[0] = xorNode;
1535 wnpNode->antType[0] = rf_trueData;
1536 if (nfaults == 2) {
1537 RF_ASSERT(wnqNode->numAntecedents == 1);
1538 xorNode->succedents[1] = wnqNode;
1539 wnqNode->antecedents[0] = xorNode;
1540 wnqNode->antType[0] = rf_trueData;
1541 }
1542 /* Connect the write nodes to the term node. */
1543 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1544 RF_ASSERT(termNode->numSuccedents == 0);
1545 for (i = 0; i < nWndNodes; i++) {
1546 RF_ASSERT(wndNodes->numSuccedents == 1);
1547 wndNodes[i].succedents[0] = termNode;
1548 termNode->antecedents[i] = &wndNodes[i];
1549 termNode->antType[i] = rf_control;
1550 }
1551 RF_ASSERT(wnpNode->numSuccedents == 1);
1552 wnpNode->succedents[0] = termNode;
1553 termNode->antecedents[nWndNodes] = wnpNode;
1554 termNode->antType[nWndNodes] = rf_control;
1555 if (nfaults == 2) {
1556 RF_ASSERT(wnqNode->numSuccedents == 1);
1557 wnqNode->succedents[0] = termNode;
1558 termNode->antecedents[nWndNodes + 1] = wnqNode;
1559 termNode->antType[nWndNodes + 1] = rf_control;
1560 }
1561 }
1562
1563
1564 /*****************************************************************************
1565 *
1566 * Create a DAG to perform a small-write operation (either raid 5 or pq),
1567 * which is as follows:
1568 *
1569 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm
1570 * \- Rod X- Wnd [Und] -------/
1571 * [\- Rod X- Wnd [Und] ------/]
1572 * [\- Roq - Q --> Wnq [Unq]-/]
1573 *
1574 * Rop = read old parity
1575 * Rod = read old data
1576 * Roq = read old "q"
1577 * Cmt = commit node
1578 * Und = unlock data disk
1579 * Unp = unlock parity disk
1580 * Unq = unlock q disk
1581 * Wnp = write new parity
1582 * Wnd = write new data
1583 * Wnq = write new "q"
1584 * [ ] denotes optional segments in the graph.
1585 *
1586 * Parameters: raidPtr - description of the physical array
1587 * asmap - logical & physical addresses for this access
1588 * bp - buffer ptr (holds write data)
1589 * flags - general flags (e.g. disk locking)
1590 * allocList - list of memory allocated in DAG creation
1591 * pfuncs - list of parity generating functions
1592 * qfuncs - list of q generating functions
1593 *
1594 * A null qfuncs indicates single fault tolerant.
1595 *****************************************************************************/
1596
1597 void
rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList,RF_RedFuncs_t * pfuncs,RF_RedFuncs_t * qfuncs)1598 rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1599 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1600 RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
1601 {
1602 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1603 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1604 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1605 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1606 int i, j, nNodes, totalNumNodes, lu_flag;
1607 RF_ReconUnitNum_t which_ru;
1608 int (*func) (RF_DagNode_t *);
1609 int (*undoFunc) (RF_DagNode_t *);
1610 int (*qfunc) (RF_DagNode_t *);
1611 int numDataNodes, numParityNodes;
1612 RF_StripeNum_t parityStripeID;
1613 RF_PhysDiskAddr_t *pda;
1614 char *name, *qname;
1615 long nfaults;
1616
1617 nfaults = qfuncs ? 2 : 1;
1618 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* Lock/unlock flag. */
1619
1620 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1621 asmap->raidAddress, &which_ru);
1622 pda = asmap->physInfo;
1623 numDataNodes = asmap->numStripeUnitsAccessed;
1624 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1625
1626 if (rf_dagDebug)
1627 printf("[Creating small-write DAG]\n");
1628 RF_ASSERT(numDataNodes > 0);
1629 dag_h->creator = "SmallWriteDAGFwd";
1630
1631 dag_h->numCommitNodes = 0;
1632 dag_h->numCommits = 0;
1633 dag_h->numSuccedents = 1;
1634
1635 qfunc = NULL;
1636 qname = NULL;
1637
1638 /*
1639 * DAG creation occurs in four steps:
1640 * 1. Count the number of nodes in the DAG.
1641 * 2. Create the nodes.
1642 * 3. Initialize the nodes.
1643 * 4. Connect the nodes.
1644 */
1645
1646 /* Step 1. Compute number of nodes in the graph. */
1647
1648 /*
1649 * Number of nodes: a read and write for each data unit, a redundancy
1650 * computation node for each parity node (nfaults * nparity), a read
1651 * and write for each parity unit, a block node, a terminate node if
1652 * atomic RMW, an unlock node for each data/redundancy unit.
1653 */
1654 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
1655 + (nfaults * 2 * numParityNodes) + 2;
1656 if (lu_flag)
1657 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1658
1659
1660 /* Step 2. Create the nodes. */
1661 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
1662 (RF_DagNode_t *), allocList);
1663 i = 0;
1664 blockNode = &nodes[i];
1665 i += 1;
1666 readDataNodes = &nodes[i];
1667 i += numDataNodes;
1668 readParityNodes = &nodes[i];
1669 i += numParityNodes;
1670 writeDataNodes = &nodes[i];
1671 i += numDataNodes;
1672 writeParityNodes = &nodes[i];
1673 i += numParityNodes;
1674 xorNodes = &nodes[i];
1675 i += numParityNodes;
1676 termNode = &nodes[i];
1677 i += 1;
1678 if (lu_flag) {
1679 unlockDataNodes = &nodes[i];
1680 i += numDataNodes;
1681 unlockParityNodes = &nodes[i];
1682 i += numParityNodes;
1683 } else {
1684 unlockDataNodes = unlockParityNodes = NULL;
1685 }
1686 if (nfaults == 2) {
1687 readQNodes = &nodes[i];
1688 i += numParityNodes;
1689 writeQNodes = &nodes[i];
1690 i += numParityNodes;
1691 qNodes = &nodes[i];
1692 i += numParityNodes;
1693 if (lu_flag) {
1694 unlockQNodes = &nodes[i];
1695 i += numParityNodes;
1696 } else {
1697 unlockQNodes = NULL;
1698 }
1699 } else {
1700 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1701 }
1702 RF_ASSERT(i == totalNumNodes);
1703
1704 /* Step 3. Initialize the nodes. */
1705 /* Initialize block node (Nil). */
1706 nNodes = numDataNodes + (nfaults * numParityNodes);
1707 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1708 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
1709 "Nil", allocList);
1710
1711 /* Initialize terminate node (Trm). */
1712 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1713 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
1714 "Trm", allocList);
1715
1716 /* Initialize nodes which read old data (Rod). */
1717 for (i = 0; i < numDataNodes; i++) {
1718 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
1719 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1720 (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h,
1721 "Rod", allocList);
1722 RF_ASSERT(pda != NULL);
1723 /* Physical disk addr desc. */
1724 readDataNodes[i].params[0].p = pda;
1725 /* Buffer to hold old data. */
1726 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
1727 pda, allocList);
1728 readDataNodes[i].params[2].v = parityStripeID;
1729 readDataNodes[i].params[3].v =
1730 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1731 lu_flag, 0, which_ru);
1732 pda = pda->next;
1733 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1734 readDataNodes[i].propList[j] = NULL;
1735 }
1736
1737 /* Initialize nodes which read old parity (Rop). */
1738 pda = asmap->parityInfo;
1739 i = 0;
1740 for (i = 0; i < numParityNodes; i++) {
1741 RF_ASSERT(pda != NULL);
1742 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
1743 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1744 numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1745 readParityNodes[i].params[0].p = pda;
1746 /* Buffer to hold old parity. */
1747 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1748 dag_h, pda, allocList);
1749 readParityNodes[i].params[2].v = parityStripeID;
1750 readParityNodes[i].params[3].v =
1751 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1752 lu_flag, 0, which_ru);
1753 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1754 readParityNodes[i].propList[0] = NULL;
1755 pda = pda->next;
1756 }
1757
1758 /* Initialize nodes which read old Q (Roq). */
1759 if (nfaults == 2) {
1760 pda = asmap->qInfo;
1761 for (i = 0; i < numParityNodes; i++) {
1762 RF_ASSERT(pda != NULL);
1763 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
1764 rf_DiskReadFunc, rf_DiskReadUndoFunc,
1765 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
1766 dag_h, "Roq", allocList);
1767 readQNodes[i].params[0].p = pda;
1768 /* Buffer to hold old Q. */
1769 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1770 dag_h, pda, allocList);
1771 readQNodes[i].params[2].v = parityStripeID;
1772 readQNodes[i].params[3].v =
1773 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1774 lu_flag, 0, which_ru);
1775 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1776 readQNodes[i].propList[0] = NULL;
1777 pda = pda->next;
1778 }
1779 }
1780 /* Initialize nodes which write new data (Wnd). */
1781 pda = asmap->physInfo;
1782 for (i = 0; i < numDataNodes; i++) {
1783 RF_ASSERT(pda != NULL);
1784 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
1785 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1786 rf_GenericWakeupFunc, 1, 1, 4, 0,
1787 dag_h, "Wnd", allocList);
1788 /* Physical disk addr desc. */
1789 writeDataNodes[i].params[0].p = pda;
1790 /* Buffer holding new data to be written. */
1791 writeDataNodes[i].params[1].p = pda->bufPtr;
1792 writeDataNodes[i].params[2].v = parityStripeID;
1793 writeDataNodes[i].params[3].v =
1794 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1795
1796 if (lu_flag) {
1797 /* Initialize node to unlock the disk queue. */
1798 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
1799 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1800 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1801 "Und", allocList);
1802 /* Physical disk addr desc. */
1803 unlockDataNodes[i].params[0].p = pda;
1804 unlockDataNodes[i].params[1].v =
1805 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1806 0, lu_flag, which_ru);
1807 }
1808 pda = pda->next;
1809 }
1810
1811
1812 /* Initialize nodes which compute new parity and Q. */
1813 /*
1814 * Use the simple XOR func in the double-XOR case, and when
1815 * accessing only a portion of one stripe unit. The distinction
1816 * between the two is that the regular XOR func assumes that the
1817 * targbuf is a full SU in size, and examines the pda associated with
1818 * the buffer to decide where within the buffer to XOR the data,
1819 * whereas the simple XOR func just XORs the data into the start of
1820 * the buffer.
1821 */
1822 if ((numParityNodes == 2) || ((numDataNodes == 1) &&
1823 (asmap->totalSectorsAccessed <
1824 raidPtr->Layout.sectorsPerStripeUnit))) {
1825 func = pfuncs->simple;
1826 undoFunc = rf_NullNodeUndoFunc;
1827 name = pfuncs->SimpleName;
1828 if (qfuncs) {
1829 qfunc = qfuncs->simple;
1830 qname = qfuncs->SimpleName;
1831 }
1832 } else {
1833 func = pfuncs->regular;
1834 undoFunc = rf_NullNodeUndoFunc;
1835 name = pfuncs->RegularName;
1836 if (qfuncs) {
1837 qfunc = qfuncs->regular;
1838 qname = qfuncs->RegularName;
1839 }
1840 }
1841 /*
1842 * Initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop}
1843 * nodes, and raidPtr.
1844 */
1845 if (numParityNodes == 2) { /* Double-xor case. */
1846 for (i = 0; i < numParityNodes; i++) {
1847 /* No wakeup func for xor. */
1848 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
1849 undoFunc, NULL, numParityNodes, numParityNodes +
1850 numDataNodes, 7, 1, dag_h, name, allocList);
1851 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1852 xorNodes[i].params[0] = readDataNodes[i].params[0];
1853 xorNodes[i].params[1] = readDataNodes[i].params[1];
1854 xorNodes[i].params[2] = readParityNodes[i].params[0];
1855 xorNodes[i].params[3] = readParityNodes[i].params[1];
1856 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1857 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1858 xorNodes[i].params[6].p = raidPtr;
1859 /* Use old parity buf as target buf. */
1860 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
1861 if (nfaults == 2) {
1862 /* No wakeup func for xor. */
1863 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
1864 qfunc, undoFunc, NULL, numParityNodes,
1865 numParityNodes + numDataNodes, 7, 1,
1866 dag_h, qname, allocList);
1867 qNodes[i].params[0] =
1868 readDataNodes[i].params[0];
1869 qNodes[i].params[1] =
1870 readDataNodes[i].params[1];
1871 qNodes[i].params[2] = readQNodes[i].params[0];
1872 qNodes[i].params[3] = readQNodes[i].params[1];
1873 qNodes[i].params[4] =
1874 writeDataNodes[i].params[0];
1875 qNodes[i].params[5] =
1876 writeDataNodes[i].params[1];
1877 qNodes[i].params[6].p = raidPtr;
1878 /* Use old Q buf as target buf. */
1879 qNodes[i].results[0] =
1880 readQNodes[i].params[1].p;
1881 }
1882 }
1883 } else {
1884 /* There is only one xor node in this case. */
1885 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
1886 NULL, numParityNodes, numParityNodes + numDataNodes,
1887 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
1888 name, allocList);
1889 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1890 for (i = 0; i < numDataNodes + 1; i++) {
1891 /* Set up params related to Rod and Rop nodes. */
1892 xorNodes[0].params[2 * i + 0] =
1893 readDataNodes[i].params[0]; /* pda */
1894 xorNodes[0].params[2 * i + 1] =
1895 readDataNodes[i].params[1]; /* buffer pointer */
1896 }
1897 for (i = 0; i < numDataNodes; i++) {
1898 /* Set up params related to Wnd and Wnp nodes. */
1899 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
1900 writeDataNodes[i].params[0]; /* pda */
1901 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
1902 writeDataNodes[i].params[1]; /* buffer pointer */
1903 }
1904 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
1905 raidPtr; /* xor node needs to get at RAID information */
1906 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1907 if (nfaults == 2) {
1908 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
1909 undoFunc, NULL, numParityNodes,
1910 numParityNodes + numDataNodes,
1911 (2 * (numDataNodes + numDataNodes + 1) + 1),
1912 1, dag_h, qname, allocList);
1913 for (i = 0; i < numDataNodes; i++) {
1914 /* Set up params related to Rod. */
1915 /* pda */
1916 qNodes[0].params[2 * i + 0] =
1917 readDataNodes[i].params[0];
1918 /* buffer pointer */
1919 qNodes[0].params[2 * i + 1] =
1920 readDataNodes[i].params[1];
1921 }
1922 /* And read old q. */
1923 qNodes[0].params[2 * numDataNodes + 0] =
1924 readQNodes[0].params[0]; /* pda */
1925 qNodes[0].params[2 * numDataNodes + 1] =
1926 readQNodes[0].params[1]; /* buffer pointer */
1927 for (i = 0; i < numDataNodes; i++) {
1928 /* Set up params related to Wnd nodes. */
1929 /* pda */
1930 qNodes[0].params
1931 [2 * (numDataNodes + 1 + i) + 0] =
1932 writeDataNodes[i].params[0];
1933 /* buffer pointer */
1934 qNodes[0].params
1935 [2 * (numDataNodes + 1 + i) + 1] =
1936 writeDataNodes[i].params[1];
1937 }
1938 /* Xor node needs to get at RAID information. */
1939 qNodes[0].params
1940 [2 * (numDataNodes + numDataNodes + 1)].p =
1941 raidPtr;
1942 qNodes[0].results[0] = readQNodes[0].params[1].p;
1943 }
1944 }
1945
1946 /* Initialize nodes which write new parity (Wnp). */
1947 pda = asmap->parityInfo;
1948 for (i = 0; i < numParityNodes; i++) {
1949 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
1950 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1951 rf_GenericWakeupFunc, 1, numParityNodes,
1952 4, 0, dag_h, "Wnp", allocList);
1953 RF_ASSERT(pda != NULL);
1954 /* Param 1 (bufPtr) filled in by xor node. */
1955 writeParityNodes[i].params[0].p = pda;
1956 /* Buffer pointer for parity write operation. */
1957 writeParityNodes[i].params[1].p = xorNodes[i].results[0];
1958 writeParityNodes[i].params[2].v = parityStripeID;
1959 writeParityNodes[i].params[3].v =
1960 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1961
1962 if (lu_flag) {
1963 /* Initialize node to unlock the disk queue. */
1964 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
1965 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1966 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1967 "Unp", allocList);
1968 unlockParityNodes[i].params[0].p =
1969 pda; /* Physical disk addr desc. */
1970 unlockParityNodes[i].params[1].v =
1971 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1972 0, lu_flag, which_ru);
1973 }
1974 pda = pda->next;
1975 }
1976
1977 /* Initialize nodes which write new Q (Wnq). */
1978 if (nfaults == 2) {
1979 pda = asmap->qInfo;
1980 for (i = 0; i < numParityNodes; i++) {
1981 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
1982 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1983 rf_GenericWakeupFunc, 1, numParityNodes,
1984 4, 0, dag_h, "Wnq", allocList);
1985 RF_ASSERT(pda != NULL);
1986 /* Param 1 (bufPtr) filled in by xor node. */
1987 writeQNodes[i].params[0].p = pda;
1988 /* Buffer pointer for parity write operation. */
1989 writeQNodes[i].params[1].p = qNodes[i].results[0];
1990 writeQNodes[i].params[2].v = parityStripeID;
1991 writeQNodes[i].params[3].v =
1992 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1993 0, 0, which_ru);
1994
1995 if (lu_flag) {
1996 /* Initialize node to unlock the disk queue. */
1997 rf_InitNode(&unlockQNodes[i], rf_wait,
1998 RF_FALSE, rf_DiskUnlockFunc,
1999 rf_DiskUnlockUndoFunc,
2000 rf_GenericWakeupFunc, 1, 1, 2, 0,
2001 dag_h, "Unq", allocList);
2002 /* Physical disk addr desc. */
2003 unlockQNodes[i].params[0].p = pda;
2004 unlockQNodes[i].params[1].v =
2005 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2006 0, lu_flag, which_ru);
2007 }
2008 pda = pda->next;
2009 }
2010 }
2011 /* Step 4. Connect the nodes. */
2012
2013 /* Connect header to block node. */
2014 dag_h->succedents[0] = blockNode;
2015
2016 /* Connect block node to read old data nodes. */
2017 RF_ASSERT(blockNode->numSuccedents ==
2018 (numDataNodes + (numParityNodes * nfaults)));
2019 for (i = 0; i < numDataNodes; i++) {
2020 blockNode->succedents[i] = &readDataNodes[i];
2021 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
2022 readDataNodes[i].antecedents[0] = blockNode;
2023 readDataNodes[i].antType[0] = rf_control;
2024 }
2025
2026 /* Connect block node to read old parity nodes. */
2027 for (i = 0; i < numParityNodes; i++) {
2028 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
2029 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
2030 readParityNodes[i].antecedents[0] = blockNode;
2031 readParityNodes[i].antType[0] = rf_control;
2032 }
2033
2034 /* Connect block node to read old Q nodes. */
2035 if (nfaults == 2)
2036 for (i = 0; i < numParityNodes; i++) {
2037 blockNode->succedents[numDataNodes +
2038 numParityNodes + i] = &readQNodes[i];
2039 RF_ASSERT(readQNodes[i].numAntecedents == 1);
2040 readQNodes[i].antecedents[0] = blockNode;
2041 readQNodes[i].antType[0] = rf_control;
2042 }
2043
2044 /* Connect read old data nodes to write new data nodes. */
2045 for (i = 0; i < numDataNodes; i++) {
2046 RF_ASSERT(readDataNodes[i].numSuccedents ==
2047 ((nfaults * numParityNodes) + 1));
2048 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
2049 readDataNodes[i].succedents[0] = &writeDataNodes[i];
2050 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
2051 writeDataNodes[i].antType[0] = rf_antiData;
2052 }
2053
2054 /* Connect read old data nodes to xor nodes. */
2055 for (i = 0; i < numDataNodes; i++) {
2056 for (j = 0; j < numParityNodes; j++) {
2057 RF_ASSERT(xorNodes[j].numAntecedents ==
2058 numDataNodes + numParityNodes);
2059 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
2060 xorNodes[j].antecedents[i] = &readDataNodes[i];
2061 xorNodes[j].antType[i] = rf_trueData;
2062 }
2063 }
2064
2065 /* Connect read old data nodes to q nodes. */
2066 if (nfaults == 2)
2067 for (i = 0; i < numDataNodes; i++)
2068 for (j = 0; j < numParityNodes; j++) {
2069 RF_ASSERT(qNodes[j].numAntecedents ==
2070 numDataNodes + numParityNodes);
2071 readDataNodes[i].succedents
2072 [1 + numParityNodes + j] = &qNodes[j];
2073 qNodes[j].antecedents[i] = &readDataNodes[i];
2074 qNodes[j].antType[i] = rf_trueData;
2075 }
2076
2077 /* Connect read old parity nodes to xor nodes. */
2078 for (i = 0; i < numParityNodes; i++) {
2079 for (j = 0; j < numParityNodes; j++) {
2080 RF_ASSERT(readParityNodes[i].numSuccedents ==
2081 numParityNodes);
2082 readParityNodes[i].succedents[j] = &xorNodes[j];
2083 xorNodes[j].antecedents[numDataNodes + i] =
2084 &readParityNodes[i];
2085 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
2086 }
2087 }
2088
2089 /* Connect read old q nodes to q nodes. */
2090 if (nfaults == 2)
2091 for (i = 0; i < numParityNodes; i++) {
2092 for (j = 0; j < numParityNodes; j++) {
2093 RF_ASSERT(readQNodes[i].numSuccedents ==
2094 numParityNodes);
2095 readQNodes[i].succedents[j] = &qNodes[j];
2096 qNodes[j].antecedents[numDataNodes + i] =
2097 &readQNodes[i];
2098 qNodes[j].antType[numDataNodes + i] =
2099 rf_trueData;
2100 }
2101 }
2102
2103 /* Connect xor nodes to the write new parity nodes. */
2104 for (i = 0; i < numParityNodes; i++) {
2105 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
2106 for (j = 0; j < numParityNodes; j++) {
2107 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
2108 xorNodes[i].succedents[j] = &writeParityNodes[j];
2109 writeParityNodes[j].antecedents[i] = &xorNodes[i];
2110 writeParityNodes[j].antType[i] = rf_trueData;
2111 }
2112 }
2113
2114 /* Connect q nodes to the write new q nodes. */
2115 if (nfaults == 2)
2116 for (i = 0; i < numParityNodes; i++) {
2117 RF_ASSERT(writeQNodes[i].numAntecedents ==
2118 numParityNodes);
2119 for (j = 0; j < numParityNodes; j++) {
2120 RF_ASSERT(qNodes[j].numSuccedents == 1);
2121 qNodes[i].succedents[j] = &writeQNodes[j];
2122 writeQNodes[j].antecedents[i] = &qNodes[i];
2123 writeQNodes[j].antType[i] = rf_trueData;
2124 }
2125 }
2126
2127 RF_ASSERT(termNode->numAntecedents ==
2128 (numDataNodes + (nfaults * numParityNodes)));
2129 RF_ASSERT(termNode->numSuccedents == 0);
2130 for (i = 0; i < numDataNodes; i++) {
2131 if (lu_flag) {
2132 /* Connect write new data nodes to unlock nodes. */
2133 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2134 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
2135 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
2136 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
2137 unlockDataNodes[i].antType[0] = rf_control;
2138
2139 /* Connect unlock nodes to term nodes. */
2140 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
2141 unlockDataNodes[i].succedents[0] = termNode;
2142 termNode->antecedents[i] = &unlockDataNodes[i];
2143 termNode->antType[i] = rf_control;
2144 } else {
2145 /* Connect write new data nodes to term node. */
2146 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2147 RF_ASSERT(termNode->numAntecedents ==
2148 (numDataNodes + (nfaults * numParityNodes)));
2149 writeDataNodes[i].succedents[0] = termNode;
2150 termNode->antecedents[i] = &writeDataNodes[i];
2151 termNode->antType[i] = rf_control;
2152 }
2153 }
2154
2155 for (i = 0; i < numParityNodes; i++) {
2156 if (lu_flag) {
2157 /* Connect write new parity nodes to unlock nodes. */
2158 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2159 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2160 writeParityNodes[i].succedents[0] =
2161 &unlockParityNodes[i];
2162 unlockParityNodes[i].antecedents[0] =
2163 &writeParityNodes[i];
2164 unlockParityNodes[i].antType[0] = rf_control;
2165
2166 /* Connect unlock nodes to term node. */
2167 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2168 unlockParityNodes[i].succedents[0] = termNode;
2169 termNode->antecedents[numDataNodes + i] =
2170 &unlockParityNodes[i];
2171 termNode->antType[numDataNodes + i] = rf_control;
2172 } else {
2173 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2174 writeParityNodes[i].succedents[0] = termNode;
2175 termNode->antecedents[numDataNodes + i] =
2176 &writeParityNodes[i];
2177 termNode->antType[numDataNodes + i] = rf_control;
2178 }
2179 }
2180
2181 if (nfaults == 2)
2182 for (i = 0; i < numParityNodes; i++) {
2183 if (lu_flag) {
2184 /* Connect write new Q nodes to unlock nodes. */
2185 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2186 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2187 writeQNodes[i].succedents[0] = &unlockQNodes[i];
2188 unlockQNodes[i].antecedents[0] =
2189 &writeQNodes[i];
2190 unlockQNodes[i].antType[0] = rf_control;
2191
2192 /* Connect unlock nodes to unblock node. */
2193 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2194 unlockQNodes[i].succedents[0] = termNode;
2195 termNode->antecedents[numDataNodes +
2196 numParityNodes + i] = &unlockQNodes[i];
2197 termNode->antType[numDataNodes +
2198 numParityNodes + i] = rf_control;
2199 } else {
2200 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2201 writeQNodes[i].succedents[0] = termNode;
2202 termNode->antecedents[numDataNodes +
2203 numParityNodes + i] = &writeQNodes[i];
2204 termNode->antType[numDataNodes +
2205 numParityNodes + i] = rf_control;
2206 }
2207 }
2208 }
2209
2210
2211
2212 /*****************************************************************************
2213 * Create a write graph (fault-free or degraded) for RAID level 1.
2214 *
2215 * Hdr Nil -> Wpd -> Nil -> Trm
2216 * Nil -> Wsd ->
2217 *
2218 * The "Wpd" node writes data to the primary copy in the mirror pair.
2219 * The "Wsd" node writes data to the secondary copy in the mirror pair.
2220 *
2221 * Parameters: raidPtr - description of the physical array
2222 * asmap - logical & physical addresses for this access
2223 * bp - buffer ptr (holds write data)
2224 * flags - general flags (e.g. disk locking)
2225 * allocList - list of memory allocated in DAG creation
2226 *****************************************************************************/
2227
2228 void
rf_CreateRaidOneWriteDAGFwd(RF_Raid_t * raidPtr,RF_AccessStripeMap_t * asmap,RF_DagHeader_t * dag_h,void * bp,RF_RaidAccessFlags_t flags,RF_AllocListElem_t * allocList)2229 rf_CreateRaidOneWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
2230 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
2231 RF_AllocListElem_t *allocList)
2232 {
2233 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2234 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2235 int nWndNodes, nWmirNodes, i;
2236 RF_ReconUnitNum_t which_ru;
2237 RF_PhysDiskAddr_t *pda, *pdaP;
2238 RF_StripeNum_t parityStripeID;
2239
2240 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2241 asmap->raidAddress, &which_ru);
2242 if (rf_dagDebug) {
2243 printf("[Creating RAID level 1 write DAG]\n");
2244 }
2245 /* 2 implies access not SU aligned. */
2246 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
2247 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2248
2249 /* Alloc the Wnd nodes and the Wmir node. */
2250 if (asmap->numDataFailed == 1)
2251 nWndNodes--;
2252 if (asmap->numParityFailed == 1)
2253 nWmirNodes--;
2254
2255 /*
2256 * Total number of nodes = nWndNodes + nWmirNodes +
2257 * (block + unblock + terminator)
2258 */
2259 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3,
2260 sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2261 i = 0;
2262 wndNode = &nodes[i];
2263 i += nWndNodes;
2264 wmirNode = &nodes[i];
2265 i += nWmirNodes;
2266 blockNode = &nodes[i];
2267 i += 1;
2268 unblockNode = &nodes[i];
2269 i += 1;
2270 termNode = &nodes[i];
2271 i += 1;
2272 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2273
2274 /* This dag can commit immediately. */
2275 dag_h->numCommitNodes = 0;
2276 dag_h->numCommits = 0;
2277 dag_h->numSuccedents = 1;
2278
2279 /* Initialize the unblock and term nodes. */
2280 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2281 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
2282 0, 0, 0, dag_h, "Nil", allocList);
2283 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2284 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
2285 0, 0, dag_h, "Nil", allocList);
2286 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
2287 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2288
2289 /* Initialize the wnd nodes. */
2290 if (nWndNodes > 0) {
2291 pda = asmap->physInfo;
2292 for (i = 0; i < nWndNodes; i++) {
2293 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
2294 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2295 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2296 "Wpd", allocList);
2297 RF_ASSERT(pda != NULL);
2298 wndNode[i].params[0].p = pda;
2299 wndNode[i].params[1].p = pda->bufPtr;
2300 wndNode[i].params[2].v = parityStripeID;
2301 wndNode[i].params[3].v =
2302 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2303 0, 0, which_ru);
2304 pda = pda->next;
2305 }
2306 RF_ASSERT(pda == NULL);
2307 }
2308 /* Initialize the mirror nodes. */
2309 if (nWmirNodes > 0) {
2310 pda = asmap->physInfo;
2311 pdaP = asmap->parityInfo;
2312 for (i = 0; i < nWmirNodes; i++) {
2313 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
2314 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2315 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2316 "Wsd", allocList);
2317 RF_ASSERT(pda != NULL);
2318 wmirNode[i].params[0].p = pdaP;
2319 wmirNode[i].params[1].p = pda->bufPtr;
2320 wmirNode[i].params[2].v = parityStripeID;
2321 wmirNode[i].params[3].v =
2322 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2323 0, 0, which_ru);
2324 pda = pda->next;
2325 pdaP = pdaP->next;
2326 }
2327 RF_ASSERT(pda == NULL);
2328 RF_ASSERT(pdaP == NULL);
2329 }
2330 /* Link the header node to the block node. */
2331 RF_ASSERT(dag_h->numSuccedents == 1);
2332 RF_ASSERT(blockNode->numAntecedents == 0);
2333 dag_h->succedents[0] = blockNode;
2334
2335 /* Link the block node to the write nodes. */
2336 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2337 for (i = 0; i < nWndNodes; i++) {
2338 RF_ASSERT(wndNode[i].numAntecedents == 1);
2339 blockNode->succedents[i] = &wndNode[i];
2340 wndNode[i].antecedents[0] = blockNode;
2341 wndNode[i].antType[0] = rf_control;
2342 }
2343 for (i = 0; i < nWmirNodes; i++) {
2344 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2345 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2346 wmirNode[i].antecedents[0] = blockNode;
2347 wmirNode[i].antType[0] = rf_control;
2348 }
2349
2350 /* Link the write nodes to the unblock node. */
2351 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2352 for (i = 0; i < nWndNodes; i++) {
2353 RF_ASSERT(wndNode[i].numSuccedents == 1);
2354 wndNode[i].succedents[0] = unblockNode;
2355 unblockNode->antecedents[i] = &wndNode[i];
2356 unblockNode->antType[i] = rf_control;
2357 }
2358 for (i = 0; i < nWmirNodes; i++) {
2359 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2360 wmirNode[i].succedents[0] = unblockNode;
2361 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2362 unblockNode->antType[i + nWndNodes] = rf_control;
2363 }
2364
2365 /* Link the unblock node to the term node. */
2366 RF_ASSERT(unblockNode->numSuccedents == 1);
2367 RF_ASSERT(termNode->numAntecedents == 1);
2368 RF_ASSERT(termNode->numSuccedents == 0);
2369 unblockNode->succedents[0] = termNode;
2370 termNode->antecedents[0] = unblockNode;
2371 termNode->antType[0] = rf_control;
2372
2373 return;
2374 }
2375