1 /* ndbio.cpp - get/set/del data for NDB */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2008-2021 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16 /* ACKNOWLEDGEMENTS:
17  * This work was initially developed by Howard Chu for inclusion
18  * in OpenLDAP Software. This work was sponsored by MySQL.
19  */
20 
21 #include "portable.h"
22 
23 #include <stdio.h>
24 #include <ac/string.h>
25 #include <ac/errno.h>
26 #include <lutil.h>
27 
28 #include "back-ndb.h"
29 
30 /* For reference only */
31 typedef struct MedVar {
32           Int16 len;          /* length is always little-endian */
33           char buf[1024];
34 } MedVar;
35 
36 extern "C" {
37           static int ndb_name_cmp( const void *v1, const void *v2 );
38           static int ndb_oc_dup_err( void *v1, void *v2 );
39 };
40 
41 static int
ndb_name_cmp(const void * v1,const void * v2)42 ndb_name_cmp( const void *v1, const void *v2 )
43 {
44           NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45           return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
46 }
47 
48 static int
ndb_oc_dup_err(void * v1,void * v2)49 ndb_oc_dup_err( void *v1, void *v2 )
50 {
51           NdbOcInfo *oc = (NdbOcInfo *)v2;
52 
53           oc->no_oc = (ObjectClass *)v1;
54           return -1;
55 }
56 
57 /* Find an existing NdbAttrInfo */
58 extern "C" NdbAttrInfo *
ndb_ai_find(struct ndb_info * ni,AttributeType * at)59 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
60 {
61           NdbAttrInfo atmp;
62           atmp.na_name = at->sat_cname;
63 
64           return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
65 }
66 
67 /* Find or create an NdbAttrInfo */
68 extern "C" NdbAttrInfo *
ndb_ai_get(struct ndb_info * ni,struct berval * aname)69 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
70 {
71           NdbAttrInfo atmp, *ai;
72           atmp.na_name = *aname;
73 
74           ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
75           if ( !ai ) {
76                     const char *text;
77                     AttributeDescription *ad = NULL;
78 
79                     if ( slap_bv2ad( aname, &ad, &text ))
80                               return NULL;
81 
82                     ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
83                     ai->na_desc = ad;
84                     ai->na_attr = ai->na_desc->ad_type;
85                     ai->na_name = ai->na_attr->sat_cname;
86                     ai->na_oi = NULL;
87                     ai->na_flag = 0;
88                     ai->na_ixcol = 0;
89                     ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90                     /* Reasonable default */
91                     if ( !ai->na_len ) {
92                               if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
93                                         ai->na_len = 1024;
94                               else
95                                         ai->na_len = 128;
96                     }
97                     /* Arbitrary limit */
98                     if ( ai->na_len > 1024 )
99                               ai->na_len = 1024;
100                     avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
101           }
102           return ai;
103 }
104 
105 static int
ndb_ai_check(struct ndb_info * ni,NdbOcInfo * oci,AttributeType ** attrs,char ** ptr,int * col,int create)106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
107           int create )
108 {
109           NdbAttrInfo *ai;
110           int i;
111 
112           for ( i=0; attrs[i]; i++ ) {
113                     if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
114                               continue;
115                     /* skip attrs that are in a superior */
116                     if ( oci->no_oc && oci->no_oc->soc_sups ) {
117                               int j, k, found=0;
118                               ObjectClass *oc;
119                               for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120                                         oc = oci->no_oc->soc_sups[j];
121                                         if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
122                                                   continue;
123                                         if ( oc->soc_required ) {
124                                                   for ( k=0; oc->soc_required[k]; k++ ) {
125                                                             if ( attrs[i] == oc->soc_required[k] ) {
126                                                                       found = 1;
127                                                                       break;
128                                                             }
129                                                   }
130                                                   if ( found ) break;
131                                         }
132                                         if ( oc->soc_allowed ) {
133                                                   for ( k=0; oc->soc_allowed[k]; k++ ) {
134                                                             if ( attrs[i] == oc->soc_allowed[k] ) {
135                                                                       found = 1;
136                                                                       break;
137                                                             }
138                                                   }
139                                                   if ( found ) break;
140                                         }
141                               }
142                               if ( found )
143                                         continue;
144                     }
145 
146                     ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
147                     if ( !ai ) {
148                               /* can never happen */
149                               return LDAP_OTHER;
150                     }
151 
152                     /* An attrset may have already been connected */
153                     if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
154                               continue;
155 
156                     /* An indexed attr is defined before its OC is */
157                     if ( !ai->na_oi ) {
158                               ai->na_oi = oci;
159                               ai->na_column = (*col)++;
160                     }
161 
162                     oci->no_attrs[oci->no_nattrs++] = ai;
163 
164                     /* An attrset attr may already be defined */
165                     if ( ai->na_oi != oci ) {
166                               int j;
167                               for ( j=0; j<oci->no_nsets; j++ )
168                                         if ( oci->no_sets[j] == ai->na_oi ) break;
169                               if ( j >= oci->no_nsets ) {
170                                         /* FIXME: data loss if more sets are in use */
171                                         if ( oci->no_nsets < NDB_MAX_OCSETS ) {
172                                                   oci->no_sets[oci->no_nsets++] = ai->na_oi;
173                                         }
174                               }
175                               continue;
176                     }
177 
178                     if ( create ) {
179                               if ( ai->na_flag & NDB_INFO_ATBLOB ) {
180                                         *ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
181                               } else {
182                                         *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
183                                                   ai->na_len );
184                               }
185                     }
186           }
187           return 0;
188 }
189 
190 static int
ndb_oc_create(struct ndb_info * ni,NdbOcInfo * oci,int create)191 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
192 {
193           char buf[4096], *ptr;
194           int i, rc = 0, col;
195 
196           if ( create ) {
197                     ptr = buf + sprintf( buf,
198                               "CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
199                               oci->no_table.bv_val );
200           }
201 
202           col = 0;
203           if ( oci->no_oc->soc_required ) {
204                     for ( i=0; oci->no_oc->soc_required[i]; i++ );
205                     col += i;
206           }
207           if ( oci->no_oc->soc_allowed ) {
208                     for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
209                     col += i;
210           }
211           /* assume all are present */
212           oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
213 
214           col = 2;
215           ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
216           if ( oci->no_oc->soc_required ) {
217                     rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
218           }
219           if ( !rc && oci->no_oc->soc_allowed ) {
220                     rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
221           }
222           ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
223 
224           /* shrink down to just the needed size */
225           oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
226                     oci->no_nattrs * sizeof(struct ndb_attrinfo *));
227 
228           if ( create ) {
229                     ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
230                     rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
231                     if ( rc ) {
232                               Debug( LDAP_DEBUG_ANY,
233                                         "ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
234                                         oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
235                     }
236           }
237           return rc;
238 }
239 
240 /* Read table definitions from the DB and populate ObjectClassInfo */
241 extern "C" int
ndb_oc_read(struct ndb_info * ni,const NdbDictionary::Dictionary * myDict)242 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
243 {
244           const NdbDictionary::Table *myTable;
245           const NdbDictionary::Column *myCol;
246           NdbOcInfo *oci, octmp;
247           NdbAttrInfo *ai;
248           ObjectClass *oc;
249           NdbDictionary::Dictionary::List myList;
250           struct berval bv;
251           int i, j, rc, col;
252 
253           rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
254           /* Populate our objectClass structures */
255           for ( i=0; i<myList.count; i++ ) {
256                     /* Ignore other DBs */
257                     if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
258                               continue;
259                     /* Ignore internal tables */
260                     if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
261                               continue;
262                     ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
263                     oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
264                     if ( oci )
265                               continue;
266 
267                     oc = oc_bvfind( &octmp.no_name );
268                     if ( !oc ) {
269                               /* undefined - shouldn't happen */
270                               continue;
271                     }
272                     myTable = myDict->getTable( myList.elements[i].name );
273                     oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
274                     oci->no_table.bv_val = (char *)(oci+1);
275                     strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
276                     oci->no_table.bv_len = oc->soc_cname.bv_len;
277                     oci->no_name = oci->no_table;
278                     oci->no_oc = oc;
279                     oci->no_flag = 0;
280                     oci->no_nsets = 0;
281                     oci->no_nattrs = 0;
282                     col = 0;
283                     /* Make space for all attrs, even tho sups will be dropped */
284                     if ( oci->no_oc->soc_required ) {
285                               for ( j=0; oci->no_oc->soc_required[j]; j++ );
286                               col = j;
287                     }
288                     if ( oci->no_oc->soc_allowed ) {
289                               for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
290                               col += j;
291                     }
292                     oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
293                     avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
294 
295                     col = myTable->getNoOfColumns();
296                     /* Skip 0 and 1, eid and vid */
297                     for ( j = 2; j<col; j++ ) {
298                               myCol = myTable->getColumn( j );
299                               ber_str2bv( myCol->getName(), 0, 0, &bv );
300                               ai = ndb_ai_get( ni, &bv );
301                               /* shouldn't happen */
302                               if ( !ai )
303                                         continue;
304                               ai->na_oi = oci;
305                               ai->na_column = j;
306                               ai->na_len = myCol->getLength();
307                               if ( myCol->getType() == NdbDictionary::Column::Blob )
308                                         ai->na_flag |= NDB_INFO_ATBLOB;
309                     }
310           }
311           /* Link to any attrsets */
312           for ( i=0; i<myList.count; i++ ) {
313                     /* Ignore other DBs */
314                     if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
315                               continue;
316                     /* Ignore internal tables */
317                     if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
318                               continue;
319                     ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
320                     oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
321                     /* shouldn't happen */
322                     if ( !oci )
323                               continue;
324                     col = 2;
325                     if ( oci->no_oc->soc_required ) {
326                               rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
327                     }
328                     if ( oci->no_oc->soc_allowed ) {
329                               rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
330                     }
331                     /* shrink down to just the needed size */
332                     oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
333                               oci->no_nattrs * sizeof(struct ndb_attrinfo *));
334           }
335           return 0;
336 }
337 
338 static int
ndb_oc_list(struct ndb_info * ni,const NdbDictionary::Dictionary * myDict,struct berval * oname,int implied,NdbOcs * out)339 ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
340           struct berval *oname, int implied, NdbOcs *out )
341 {
342           const NdbDictionary::Table *myTable;
343           NdbOcInfo *oci, octmp;
344           ObjectClass *oc;
345           int i, rc;
346 
347           /* shortcut top */
348           if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
349                     octmp.no_name = *oname;
350                     oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
351                     if ( oci ) {
352                               oc = oci->no_oc;
353                     } else {
354                               oc = oc_bvfind( oname );
355                               if ( !oc ) {
356                                         /* undefined - shouldn't happen */
357                                         return LDAP_INVALID_SYNTAX;
358                               }
359                     }
360                     if ( oc->soc_sups ) {
361                               int i;
362 
363                               for ( i=0; oc->soc_sups[i]; i++ ) {
364                                         rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
365                                         if ( rc ) return rc;
366                               }
367                     }
368           } else {
369                     oc = slap_schema.si_oc_top;
370           }
371           /* Only insert once */
372           for ( i=0; i<out->no_ntext; i++ )
373                     if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
374                               break;
375           if ( i == out->no_ntext ) {
376                     for ( i=0; i<out->no_nitext; i++ )
377                               if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
378                                         break;
379                     if ( i == out->no_nitext ) {
380                               if ( implied )
381                                         out->no_itext[out->no_nitext++] = oc->soc_cname;
382                               else
383                                         out->no_text[out->no_ntext++] = oc->soc_cname;
384                     }
385           }
386 
387           /* ignore top, etc... */
388           if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
389                     return 0;
390 
391           if ( !oci ) {
392                     ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
393                     oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
394                     oci->no_table.bv_val = (char *)(oci+1);
395                     strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
396                     oci->no_table.bv_len = oc->soc_cname.bv_len;
397                     oci->no_name = oci->no_table;
398                     oci->no_oc = oc;
399                     oci->no_flag = 0;
400                     oci->no_nsets = 0;
401                     oci->no_nattrs = 0;
402                     ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
403                     if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
404                               octmp.no_oc = oci->no_oc;
405                               ch_free( oci );
406                               oci = (NdbOcInfo *)octmp.no_oc;
407                     }
408                     /* see if the oc table already exists in the DB */
409                     myTable = myDict->getTable( oci->no_table.bv_val );
410                     rc = ndb_oc_create( ni, oci, myTable == NULL );
411                     ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
412                     ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
413                     if ( rc ) return rc;
414           }
415           /* Only insert once */
416           for ( i=0; i<out->no_ninfo; i++ )
417                     if ( out->no_info[i] == oci )
418                               break;
419           if ( i == out->no_ninfo )
420                     out->no_info[out->no_ninfo++] = oci;
421           return 0;
422 }
423 
424 extern "C" int
ndb_aset_get(struct ndb_info * ni,struct berval * sname,struct berval * attrs,NdbOcInfo ** ret)425 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
426 {
427           NdbOcInfo *oci, octmp;
428           int i, rc;
429 
430           octmp.no_name = *sname;
431           oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
432           if ( oci )
433                     return LDAP_ALREADY_EXISTS;
434 
435           for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
436                     if ( !at_bvfind( &attrs[i] ))
437                               return LDAP_NO_SUCH_ATTRIBUTE;
438           }
439           i++;
440 
441           oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
442                     i*sizeof(AttributeType *) + sname->bv_len+1 );
443           oci->no_oc = (ObjectClass *)(oci+1);
444           oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
445           oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
446 
447           for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
448                     oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
449 
450           strcpy( oci->no_table.bv_val, sname->bv_val );
451           oci->no_table.bv_len = sname->bv_len;
452           oci->no_name = oci->no_table;
453           oci->no_oc->soc_cname = oci->no_name;
454           oci->no_flag = NDB_INFO_ATSET;
455 
456           if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
457                     oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
458 
459           rc = ndb_oc_create( ni, oci, 0 );
460           if ( !rc )
461                     rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
462           if ( rc ) {
463                     ch_free( oci );
464           } else {
465                     *ret = oci;
466           }
467           return rc;
468 }
469 
470 extern "C" int
ndb_aset_create(struct ndb_info * ni,NdbOcInfo * oci)471 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
472 {
473           char buf[4096], *ptr;
474           NdbAttrInfo *ai;
475           int i;
476 
477           ptr = buf + sprintf( buf,
478                     "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
479                     oci->no_table.bv_val );
480 
481           for ( i=0; i<oci->no_nattrs; i++ ) {
482                     if ( oci->no_attrs[i]->na_oi != oci )
483                               continue;
484                     ai = oci->no_attrs[i];
485                     ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
486                               ai->na_len );
487                     if ( ai->na_flag & NDB_INFO_INDEX ) {
488                               ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
489                     }
490           }
491           ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
492           i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
493           if ( i ) {
494                     Debug( LDAP_DEBUG_ANY,
495                               "ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
496                               oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
497           }
498           return i;
499 }
500 
501 static int
ndb_oc_check(BackendDB * be,Ndb * ndb,struct berval * ocsin,NdbOcs * out)502 ndb_oc_check( BackendDB *be, Ndb *ndb,
503           struct berval *ocsin, NdbOcs *out )
504 {
505           struct ndb_info *ni = (struct ndb_info *) be->be_private;
506           const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
507 
508           int i, rc = 0;
509 
510           out->no_ninfo = 0;
511           out->no_ntext = 0;
512           out->no_nitext = 0;
513 
514           /* Find all objectclasses and their superiors. List
515            * the superiors first.
516            */
517 
518           ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
519           for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
520                     rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
521                     if ( rc ) break;
522           }
523           ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
524           return rc;
525 }
526 
527 #define   V_INS     1
528 #define   V_DEL     2
529 #define   V_REP     3
530 
531 static int ndb_flush_blobs;
532 
533 /* set all the unique attrs of this objectclass into the table
534  */
535 extern "C" int
ndb_oc_attrs(NdbTransaction * txn,const NdbDictionary::Table * myTable,Entry * e,NdbOcInfo * no,NdbAttrInfo ** attrs,int nattrs,Attribute * old)536 ndb_oc_attrs(
537           NdbTransaction *txn,
538           const NdbDictionary::Table *myTable,
539           Entry *e,
540           NdbOcInfo *no,
541           NdbAttrInfo **attrs,
542           int nattrs,
543           Attribute *old
544 )
545 {
546           char buf[65538], *ptr;
547           Attribute **an, **ao, *a;
548           NdbOperation *myop;
549           int i, j, max = 0;
550           int changed, rc;
551           Uint64 eid = e->e_id;
552 
553           if ( !nattrs )
554                     return 0;
555 
556           an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
557           ao = an + nattrs;
558 
559           /* Turn lists of attrs into arrays for easier access */
560           for ( i=0; i<nattrs; i++ ) {
561                     if ( attrs[i]->na_oi != no ) {
562                               an[i] = NULL;
563                               ao[i] = NULL;
564                               continue;
565                     }
566                     for ( a=e->e_attrs; a; a=a->a_next ) {
567                               if ( a->a_desc == slap_schema.si_ad_objectClass )
568                                         continue;
569                               if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
570                                         /* Don't process same attr twice */
571                                         if ( a->a_flags & SLAP_ATTR_IXADD )
572                                                   a = NULL;
573                                         else
574                                                   a->a_flags |= SLAP_ATTR_IXADD;
575                                         break;
576                               }
577                     }
578                     an[i] = a;
579                     if ( a && a->a_numvals > max )
580                               max = a->a_numvals;
581                     for ( a=old; a; a=a->a_next ) {
582                               if ( a->a_desc == slap_schema.si_ad_objectClass )
583                                         continue;
584                               if ( a->a_desc->ad_type == attrs[i]->na_attr )
585                                         break;
586                     }
587                     ao[i] = a;
588                     if ( a && a->a_numvals > max )
589                               max = a->a_numvals;
590           }
591 
592           for ( i=0; i<max; i++ ) {
593                     myop = NULL;
594                     for ( j=0; j<nattrs; j++ ) {
595                               if ( !an[j] && !ao[j] )
596                                         continue;
597                               changed = 0;
598                               if ( an[j] && an[j]->a_numvals > i ) {
599                                         /* both old and new are present, compare for changes */
600                                         if ( ao[j] && ao[j]->a_numvals > i ) {
601                                                   if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
602                                                             changed = V_REP;
603                                         } else {
604                                                   changed = V_INS;
605                                         }
606                               } else {
607                                         if ( ao[j] && ao[j]->a_numvals > i )
608                                                   changed = V_DEL;
609                               }
610                               if ( changed ) {
611                                         if ( !myop ) {
612                                                   rc = LDAP_OTHER;
613                                                   myop = txn->getNdbOperation( myTable );
614                                                   if ( !myop ) {
615                                                             goto done;
616                                                   }
617                                                   if ( old ) {
618                                                             if ( myop->writeTuple()) {
619                                                                       goto done;
620                                                             }
621                                                   } else {
622                                                             if ( myop->insertTuple()) {
623                                                                       goto done;
624                                                             }
625                                                   }
626                                                   if ( myop->equal( EID_COLUMN, eid )) {
627                                                             goto done;
628                                                   }
629                                                   if ( myop->equal( VID_COLUMN, i )) {
630                                                             goto done;
631                                                   }
632                                         }
633                                         if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
634                                                   NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
635                                                   rc = LDAP_OTHER;
636                                                   if ( !myBlob ) {
637                                                             Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
638                                                                       myop->getNdbError().message, myop->getNdbError().code, 0 );
639                                                             goto done;
640                                                   }
641                                                   if ( slapMode & SLAP_TOOL_MODE )
642                                                             ndb_flush_blobs = 1;
643                                                   if ( changed & V_INS ) {
644                                                             if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
645                                                                       Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
646                                                                                 myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
647                                                                       goto done;
648                                                             }
649                                                   } else {
650                                                             if ( myBlob->setValue( NULL, 0 )) {
651                                                                       Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
652                                                                                 myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
653                                                                       goto done;
654                                                             }
655                                                   }
656                                         } else {
657                                                   if ( changed & V_INS ) {
658                                                             if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
659                                                                       Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
660                                                                                 attrs[j]->na_name.bv_val, 0, 0 );
661                                                                       rc = LDAP_CONSTRAINT_VIOLATION;
662                                                                       goto done;
663                                                             }
664                                                             ptr = buf;
665                                                             *ptr++ = an[j]->a_vals[i].bv_len & 0xff;
666                                                             if ( attrs[j]->na_len > 255 ) {
667                                                                       /* MedVar */
668                                                                       *ptr++ = an[j]->a_vals[i].bv_len >> 8;
669                                                             }
670                                                             memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
671                                                             ptr = buf;
672                                                   } else {
673                                                             ptr = NULL;
674                                                   }
675                                                   if ( myop->setValue( attrs[j]->na_column, ptr )) {
676                                                             rc = LDAP_OTHER;
677                                                             goto done;
678                                                   }
679                                         }
680                               }
681                     }
682           }
683           rc = LDAP_SUCCESS;
684 done:
685           ch_free( an );
686           if ( rc ) {
687                     Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
688                               myop->getNdbError().message, myop->getNdbError().code, 0 );
689           }
690           return rc;
691 }
692 
693 static int
ndb_oc_put(const NdbDictionary::Dictionary * myDict,NdbTransaction * txn,NdbOcInfo * no,Entry * e)694 ndb_oc_put(
695           const NdbDictionary::Dictionary *myDict,
696           NdbTransaction *txn, NdbOcInfo *no, Entry *e )
697 {
698           const NdbDictionary::Table *myTable;
699           int i, rc;
700 
701           for ( i=0; i<no->no_nsets; i++ ) {
702                     rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
703                     if ( rc )
704                               return rc;
705           }
706 
707           myTable = myDict->getTable( no->no_table.bv_val );
708           if ( !myTable )
709                     return LDAP_OTHER;
710 
711           return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
712 }
713 
714 /* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
715 extern "C" int
ndb_entry_put_data(BackendDB * be,NdbArgs * NA)716 ndb_entry_put_data(
717           BackendDB *be,
718           NdbArgs *NA
719 )
720 {
721           struct ndb_info *ni = (struct ndb_info *) be->be_private;
722           Attribute *aoc;
723           const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
724           NdbOcs myOcs;
725           int i, rc;
726 
727           /* Get the entry's objectClass attribute */
728           aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
729           if ( !aoc )
730                     return LDAP_OTHER;
731 
732           ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
733           myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
734 
735           /* Walk thru objectclasses, find all the attributes belonging to a class */
736           for ( i=0; i<myOcs.no_ninfo; i++ ) {
737                     rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
738                     if ( rc ) return rc;
739           }
740 
741           /* slapadd tries to batch multiple entries per txn, but entry data is
742            * transient and blob data is required to remain valid for the whole txn.
743            * So we need to flush blobs before their source data disappears.
744            */
745           if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
746                     NA->txn->execute( NdbTransaction::NoCommit );
747 
748           return 0;
749 }
750 
751 static void
ndb_oc_get(Operation * op,NdbOcInfo * no,int * j,int * nocs,NdbOcInfo *** oclist)752 ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
753 {
754           int i;
755           NdbOcInfo  **ol2;
756 
757           for ( i=0; i<no->no_nsets; i++ ) {
758                     ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
759           }
760 
761           /* Don't insert twice */
762           ol2 = *oclist;
763           for ( i=0; i<*j; i++ )
764                     if ( ol2[i] == no )
765                               return;
766 
767           if ( *j >= *nocs ) {
768                     *nocs *= 2;
769                     ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
770                     *oclist = ol2;
771           }
772           ol2 = *oclist;
773           ol2[(*j)++] = no;
774 }
775 
776 /* Retrieve attribute data for given entry. The entry's DN and eid should
777  * already be populated.
778  */
779 extern "C" int
ndb_entry_get_data(Operation * op,NdbArgs * NA,int update)780 ndb_entry_get_data(
781           Operation *op,
782           NdbArgs *NA,
783           int update
784 )
785 {
786           struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
787           const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
788           const NdbDictionary::Table *myTable;
789           NdbIndexScanOperation **myop = NULL;
790           Uint64 eid;
791 
792           Attribute *a;
793           NdbOcs myOcs;
794           NdbOcInfo *oci, **oclist = NULL;
795           char abuf[65536], *ptr, **attrs = NULL;
796           struct berval bv[2];
797           int *ocx = NULL;
798 
799           /* FIXME: abuf should be dynamically allocated */
800 
801           int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
802 
803           eid = NA->e->e_id;
804 
805           ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
806           myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
807           nocs = myOcs.no_ninfo;
808 
809           oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
810 
811           for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
812                     ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
813           }
814 
815           nocs = j;
816           nattrs = 0;
817           for ( i=0; i<nocs; i++ )
818                     nattrs += oclist[i]->no_nattrs;
819 
820           ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
821 
822           attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
823 
824           myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
825 
826           k = 0;
827           ptr = abuf;
828           for ( i=0; i<nocs; i++ ) {
829                     oci = oclist[i];
830 
831                     myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
832                     if ( !myop[i] )
833                               goto leave;
834                     if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
835                               goto leave;
836                     if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
837                               goto leave;
838 
839                     for ( j=0; j<oci->no_nattrs; j++ ) {
840                               if ( oci->no_attrs[j]->na_oi != oci )
841                                         continue;
842                               if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
843                                         NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
844                                         attrs[k++] = (char *)bi;
845                               } else {
846                                         attrs[k] = ptr;
847                                         *ptr++ = 0;
848                                         if ( oci->no_attrs[j]->na_len > 255 )
849                                                   *ptr++ = 0;
850                                         ptr += oci->no_attrs[j]->na_len + 1;
851                                         myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
852                               }
853                     }
854                     ocx[i] = k;
855           }
856           /* Must use IgnoreError, because an entry with multiple objectClasses may not
857            * actually have attributes defined in each class / table.
858            */
859           if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
860                     goto leave;
861 
862           /* count results */
863           for ( i=0; i<nocs; i++ ) {
864                     if (( j = myop[i]->nextResult(true) )) {
865                               if ( j < 0 ) {
866                                         Debug( LDAP_DEBUG_TRACE,
867                                                   "ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
868                                                   i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
869                               }
870                               myop[i] = NULL;
871                     }
872           }
873 
874           nattrs = 0;
875           k = 0;
876           for ( i=0; i<nocs; i++ ) {
877                     oci = oclist[i];
878                     for ( j=0; j<oci->no_nattrs; j++ ) {
879                               unsigned char *buf;
880                               int len;
881                               if ( oci->no_attrs[j]->na_oi != oci )
882                                         continue;
883                               if ( !myop[i] ) {
884                                         attrs[k] = NULL;
885                               } else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
886                                         void *vi = attrs[k];
887                                         NdbBlob *bi = (NdbBlob *)vi;
888                                         int isNull;
889                                         bi->getNull( isNull );
890                                         if ( !isNull ) {
891                                                   nattrs++;
892                                         } else {
893                                                   attrs[k] = NULL;
894                                         }
895                               } else {
896                                         buf = (unsigned char *)attrs[k];
897                                         len = buf[0];
898                                         if ( oci->no_attrs[j]->na_len > 255 ) {
899                                                   /* MedVar */
900                                                   len |= (buf[1] << 8);
901                                         }
902                                         if ( len ) {
903                                                   nattrs++;
904                                         } else {
905                                                   attrs[k] = NULL;
906                                         }
907                               }
908                               k++;
909                     }
910           }
911 
912           a = attrs_alloc( nattrs+1 );
913           NA->e->e_attrs = a;
914 
915           a->a_desc = slap_schema.si_ad_objectClass;
916           a->a_vals = NULL;
917           ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
918           a->a_nvals = a->a_vals;
919           a->a_numvals = myOcs.no_ntext;
920 
921           BER_BVZERO( &bv[1] );
922 
923           do {
924                     a = NA->e->e_attrs->a_next;
925                     k = 0;
926                     for ( i=0; i<nocs; k=ocx[i], i++ ) {
927                               oci = oclist[i];
928                               for ( j=0; j<oci->no_nattrs; j++ ) {
929                                         unsigned char *buf;
930                                         struct berval nbv;
931                                         if ( oci->no_attrs[j]->na_oi != oci )
932                                                   continue;
933                                         buf = (unsigned char *)attrs[k++];
934                                         if ( !buf )
935                                                   continue;
936                                         if ( !myop[i] ) {
937                                                   a=a->a_next;
938                                                   continue;
939                                         }
940                                         if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
941                                                   void *vi = (void *)buf;
942                                                   NdbBlob *bi = (NdbBlob *)vi;
943                                                   Uint64 len;
944                                                   Uint32 len2;
945                                                   int isNull;
946                                                   bi->getNull( isNull );
947                                                   if ( isNull ) {
948                                                             a = a->a_next;
949                                                             continue;
950                                                   }
951                                                   bi->getLength( len );
952                                                   bv[0].bv_len = len;
953                                                   bv[0].bv_val = (char *)ch_malloc( len+1 );
954                                                   len2 = len;
955                                                   if ( bi->readData( bv[0].bv_val, len2 )) {
956                                                             Debug( LDAP_DEBUG_TRACE,
957                                                                       "ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
958                                                                       bi->getNdbError().message, bi->getNdbError().code, len2 );
959                                                   }
960                                                   bv[0].bv_val[len] = '\0';
961                                                   ber_bvarray_add_x( &a->a_vals, bv, NULL );
962                                         } else {
963                                                   bv[0].bv_len = buf[0];
964                                                   if ( oci->no_attrs[j]->na_len > 255 ) {
965                                                             /* MedVar */
966                                                             bv[0].bv_len |= (buf[1] << 8);
967                                                             bv[0].bv_val = (char *)buf+2;
968                                                             buf[1] = 0;
969                                                   } else {
970                                                             bv[0].bv_val = (char *)buf+1;
971                                                   }
972                                                   buf[0] = 0;
973                                                   if ( bv[0].bv_len == 0 ) {
974                                                             a = a->a_next;
975                                                             continue;
976                                                   }
977                                                   bv[0].bv_val[bv[0].bv_len] = '\0';
978                                                   value_add_one( &a->a_vals, bv );
979                                         }
980                                         a->a_desc = oci->no_attrs[j]->na_desc;
981                                         attr_normalize_one( a->a_desc, bv, &nbv, NULL );
982                                         a->a_numvals++;
983                                         if ( !BER_BVISNULL( &nbv )) {
984                                                   ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
985                                         } else if ( !a->a_nvals ) {
986                                                   a->a_nvals = a->a_vals;
987                                         }
988                                         a = a->a_next;
989                               }
990                     }
991                     k = 0;
992                     for ( i=0; i<nocs; i++ ) {
993                               if ( !myop[i] )
994                                         continue;
995                               if ((j = myop[i]->nextResult(true))) {
996                                         if ( j < 0 ) {
997                                                   Debug( LDAP_DEBUG_TRACE,
998                                                             "ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
999                                                             i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
1000                                         }
1001                                         myop[i] = NULL;
1002                               } else {
1003                                         k = 1;
1004                               }
1005                     }
1006           } while ( k );
1007 
1008           rc = 0;
1009 leave:
1010           if ( myop ) {
1011                     op->o_tmpfree( myop, op->o_tmpmemctx );
1012           }
1013           if ( attrs ) {
1014                     op->o_tmpfree( attrs, op->o_tmpmemctx );
1015           }
1016           if ( ocx ) {
1017                     op->o_tmpfree( ocx, op->o_tmpmemctx );
1018           }
1019           if ( oclist ) {
1020                     op->o_tmpfree( oclist, op->o_tmpmemctx );
1021           }
1022 
1023           return rc;
1024 }
1025 
1026 static int
ndb_oc_del(NdbTransaction * txn,Uint64 eid,NdbOcInfo * no)1027 ndb_oc_del(
1028           NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
1029 {
1030           NdbIndexScanOperation *myop;
1031           int i, rc;
1032 
1033           for ( i=0; i<no->no_nsets; i++ ) {
1034                     rc = ndb_oc_del( txn, eid, no->no_sets[i] );
1035                     if ( rc ) return rc;
1036           }
1037 
1038           myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
1039           if ( !myop )
1040                     return LDAP_OTHER;
1041           if ( myop->readTuples( NdbOperation::LM_Exclusive ))
1042                     return LDAP_OTHER;
1043           if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
1044                     return LDAP_OTHER;
1045 
1046           txn->execute(NoCommit);
1047           while ( myop->nextResult(true) == 0) {
1048                     do {
1049                               myop->deleteCurrentTuple();
1050                     } while (myop->nextResult(false) == 0);
1051                     txn->execute(NoCommit);
1052           }
1053 
1054           return 0;
1055 }
1056 
1057 extern "C" int
ndb_entry_del_data(BackendDB * be,NdbArgs * NA)1058 ndb_entry_del_data(
1059           BackendDB *be,
1060           NdbArgs *NA
1061 )
1062 {
1063           struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064           Uint64 eid = NA->e->e_id;
1065           int i;
1066           NdbOcs myOcs;
1067 
1068           ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
1069           myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
1070 
1071           for ( i=0; i<myOcs.no_ninfo; i++ ) {
1072                     if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
1073                               return LDAP_OTHER;
1074           }
1075 
1076           return 0;
1077 }
1078 
1079 extern "C" int
ndb_dn2rdns(struct berval * dn,NdbRdns * rdns)1080 ndb_dn2rdns(
1081           struct berval *dn,
1082           NdbRdns *rdns
1083 )
1084 {
1085           char *beg, *end;
1086           int i, len;
1087 
1088           /* Walk thru RDNs */
1089           end = dn->bv_val + dn->bv_len;
1090           for ( i=0; i<NDB_MAX_RDNS; i++ ) {
1091                     for ( beg = end-1; beg > dn->bv_val; beg-- ) {
1092                               if (*beg == ',') {
1093                                         beg++;
1094                                         break;
1095                               }
1096                     }
1097                     if ( beg >= dn->bv_val ) {
1098                               len = end - beg;
1099                               /* RDN is too long */
1100                               if ( len > NDB_RDN_LEN )
1101                                         return LDAP_CONSTRAINT_VIOLATION;
1102                               memcpy( rdns->nr_buf[i]+1, beg, len );
1103                     } else {
1104                               break;
1105                     }
1106                     rdns->nr_buf[i][0] = len;
1107                     end = beg - 1;
1108           }
1109           /* Too many RDNs in DN */
1110           if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
1111                               return LDAP_CONSTRAINT_VIOLATION;
1112           }
1113           rdns->nr_num = i;
1114           return 0;
1115 }
1116 
1117 static int
ndb_rdns2keys(NdbOperation * myop,NdbRdns * rdns)1118 ndb_rdns2keys(
1119           NdbOperation *myop,
1120           NdbRdns *rdns
1121 )
1122 {
1123           int i;
1124           char dummy[2] = {0,0};
1125 
1126           /* Walk thru RDNs */
1127           for ( i=0; i<rdns->nr_num; i++ ) {
1128                     if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
1129                               return LDAP_OTHER;
1130           }
1131           for ( ; i<NDB_MAX_RDNS; i++ ) {
1132                     if ( myop->equal( i+RDN_COLUMN, dummy ))
1133                               return LDAP_OTHER;
1134           }
1135           return 0;
1136 }
1137 
1138 /* Store the DN2ID_TABLE fields */
1139 extern "C" int
ndb_entry_put_info(BackendDB * be,NdbArgs * NA,int update)1140 ndb_entry_put_info(
1141           BackendDB *be,
1142           NdbArgs *NA,
1143           int update
1144 )
1145 {
1146           struct ndb_info *ni = (struct ndb_info *) be->be_private;
1147           const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1148           const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1149           NdbOperation *myop;
1150           NdbAttrInfo *ai;
1151           Attribute *aoc, *a;
1152 
1153           /* Get the entry's objectClass attribute; it's ok to be
1154            * absent on a fresh insert
1155            */
1156           aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
1157           if ( update && !aoc )
1158                     return LDAP_OBJECT_CLASS_VIOLATION;
1159 
1160           myop = NA->txn->getNdbOperation( myTable );
1161           if ( !myop )
1162                     return LDAP_OTHER;
1163           if ( update ) {
1164                     if ( myop->updateTuple())
1165                               return LDAP_OTHER;
1166           } else {
1167                     if ( myop->insertTuple())
1168                               return LDAP_OTHER;
1169           }
1170 
1171           if ( ndb_rdns2keys( myop, NA->rdns ))
1172                     return LDAP_OTHER;
1173 
1174           /* Set entry ID */
1175           {
1176                     Uint64 eid = NA->e->e_id;
1177                     if ( myop->setValue( EID_COLUMN, eid ))
1178                               return LDAP_OTHER;
1179           }
1180 
1181           /* Set list of objectClasses */
1182           /* List is <sp> <class> <sp> <class> <sp> ... so that
1183            * searches for " class " will yield accurate results
1184            */
1185           if ( aoc ) {
1186                     char *ptr, buf[sizeof(MedVar)];
1187                     NdbOcs myOcs;
1188                     int i;
1189 
1190                     ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
1191                     ptr = buf+2;
1192                     *ptr++ = ' ';
1193                     for ( i=0; i<myOcs.no_ntext; i++ ) {
1194                               /* data loss... */
1195                               if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
1196                                         break;
1197                               ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
1198                               *ptr++ = ' ';
1199                     }
1200 
1201                     /* implicit classes */
1202                     if ( myOcs.no_nitext ) {
1203                               *ptr++ = '@';
1204                               *ptr++ = ' ';
1205                               for ( i=0; i<myOcs.no_nitext; i++ ) {
1206                                         /* data loss... */
1207                                         if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
1208                                                   break;
1209                                         ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
1210                                         *ptr++ = ' ';
1211                               }
1212                     }
1213 
1214                     i = ptr - buf - 2;
1215                     buf[0] = i & 0xff;
1216                     buf[1] = i >> 8;
1217                     if ( myop->setValue( OCS_COLUMN, buf ))
1218                               return LDAP_OTHER;
1219           }
1220 
1221           /* Set any indexed attrs */
1222           for ( a = NA->e->e_attrs; a; a=a->a_next ) {
1223                     ai = ndb_ai_find( ni, a->a_desc->ad_type );
1224                     if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
1225                               char *ptr, buf[sizeof(MedVar)];
1226                               int len;
1227 
1228                               ptr = buf+1;
1229                               len = a->a_vals[0].bv_len;
1230                               /* FIXME: data loss */
1231                               if ( len > ai->na_len )
1232                                         len = ai->na_len;
1233                               buf[0] = len & 0xff;
1234                               if ( ai->na_len > 255 ) {
1235                                         *ptr++ = len >> 8;
1236                               }
1237                               memcpy( ptr, a->a_vals[0].bv_val, len );
1238                               if ( myop->setValue( ai->na_ixcol, buf ))
1239                                         return LDAP_OTHER;
1240                     }
1241           }
1242 
1243           return 0;
1244 }
1245 
1246 extern "C" struct berval *
ndb_str2bvarray(char * str,int len,char delim,void * ctx)1247 ndb_str2bvarray(
1248           char *str,
1249           int len,
1250           char delim,
1251           void *ctx
1252 )
1253 {
1254           struct berval *list, tmp;
1255           char *beg;
1256           int i, num;
1257 
1258           while ( *str == delim ) {
1259                     str++;
1260                     len--;
1261           }
1262 
1263           while ( str[len-1] == delim ) {
1264                     str[--len] = '\0';
1265           }
1266 
1267           for ( i = 1, beg = str;; i++ ) {
1268                     beg = strchr( beg, delim );
1269                     if ( !beg )
1270                               break;
1271                     if ( beg >= str + len )
1272                               break;
1273                     beg++;
1274           }
1275 
1276           num = i;
1277           list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
1278 
1279           for ( i = 0, beg = str; i<num; i++ ) {
1280                     tmp.bv_val = beg;
1281                     beg = strchr( beg, delim );
1282                     if ( beg >= str + len )
1283                               beg = NULL;
1284                     if ( beg ) {
1285                               tmp.bv_len = beg - tmp.bv_val;
1286                     } else {
1287                               tmp.bv_len = len - (tmp.bv_val - str);
1288                     }
1289                     ber_dupbv_x( &list[i], &tmp, ctx );
1290                     beg++;
1291           }
1292 
1293           BER_BVZERO( &list[i] );
1294           return list;
1295 }
1296 
1297 extern "C" struct berval *
ndb_ref2oclist(const char * ref,void * ctx)1298 ndb_ref2oclist(
1299           const char *ref,
1300           void *ctx
1301 )
1302 {
1303           char *implied;
1304 
1305           /* MedVar */
1306           int len = ref[0] | (ref[1] << 8);
1307 
1308           /* don't return the implied classes */
1309           implied = (char *)memchr( ref+2, '@', len );
1310           if ( implied ) {
1311                     len = implied - ref - 2;
1312                     *implied = '\0';
1313           }
1314 
1315           return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
1316 }
1317 
1318 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1319  * the existence of a DN.
1320  */
1321 extern "C" int
ndb_entry_get_info(Operation * op,NdbArgs * NA,int update,struct berval * matched)1322 ndb_entry_get_info(
1323           Operation *op,
1324           NdbArgs *NA,
1325           int update,
1326           struct berval *matched
1327 )
1328 {
1329           struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1330           const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1331           const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1332           NdbOperation *myop[NDB_MAX_RDNS];
1333           NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
1334           char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
1335           char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
1336 
1337           if ( matched ) {
1338                     BER_BVZERO( matched );
1339           }
1340           if ( !myTable ) {
1341                     return LDAP_OTHER;
1342           }
1343 
1344           myop[0] = NA->txn->getNdbOperation( myTable );
1345           if ( !myop[0] ) {
1346                     return LDAP_OTHER;
1347           }
1348 
1349           if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1350                     return LDAP_OTHER;
1351           }
1352 
1353           if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1354                     return LDAP_NO_SUCH_OBJECT;
1355           }
1356 
1357           if ( ndb_rdns2keys( myop[0], NA->rdns )) {
1358                     return LDAP_OTHER;
1359           }
1360 
1361           eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
1362           if ( !eid[0] ) {
1363                     return LDAP_OTHER;
1364           }
1365 
1366           ocbuf[0][0] = 0;
1367           ocbuf[0][1] = 0;
1368           if ( !NA->ocs ) {
1369                     oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
1370                     if ( !oc[0] ) {
1371                               return LDAP_OTHER;
1372                     }
1373           }
1374 
1375           if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1376                     return LDAP_OTHER;
1377           }
1378 
1379           switch( myop[0]->getNdbError().code ) {
1380           case 0:
1381                     if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
1382                               /* If we didn't care about OCs, or we got them */
1383                               if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
1384                                         /* If wanted, return them */
1385                                         if ( !NA->ocs )
1386                                                   NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
1387                                         break;
1388                               }
1389                     }
1390                     /* FALLTHRU */
1391           case NDB_NO_SUCH_OBJECT:      /* no such tuple: look for closest parent */
1392                     if ( matched ) {
1393                               int i, j, k;
1394                               char dummy[2] = {0,0};
1395 
1396                               /* get to last RDN, then back up 1 */
1397                               k = NA->rdns->nr_num - 1;
1398 
1399                               for ( i=0; i<k; i++ ) {
1400                                         myop[i] = NA->txn->getNdbOperation( myTable );
1401                                         if ( !myop[i] )
1402                                                   return LDAP_OTHER;
1403                                         if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
1404                                                   return LDAP_OTHER;
1405                                         for ( j=0; j<=i; j++ ) {
1406                                                   if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1407                                                             return LDAP_OTHER;
1408                                         }
1409                                         for ( ;j<NDB_MAX_RDNS; j++ ) {
1410                                                   if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
1411                                                             return LDAP_OTHER;
1412                                         }
1413                                         eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
1414                                         if ( !eid[i] ) {
1415                                                   return LDAP_OTHER;
1416                                         }
1417                                         ocbuf[i][0] = 0;
1418                                         ocbuf[i][1] = 0;
1419                                         if ( !NA->ocs ) {
1420                                                   oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
1421                                                   if ( !oc[i] ) {
1422                                                             return LDAP_OTHER;
1423                                                   }
1424                                         }
1425                               }
1426                               if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1427                                         return LDAP_OTHER;
1428                               }
1429                               for ( --i; i>=0; i-- ) {
1430                                         if ( myop[i]->getNdbError().code == 0 ) {
1431                                                   for ( j=0; j<=i; j++ )
1432                                                             matched->bv_len += NA->rdns->nr_buf[j][0];
1433                                                   NA->erdns = NA->rdns->nr_num;
1434                                                   NA->rdns->nr_num = j;
1435                                                   matched->bv_len += i;
1436                                                   matched->bv_val = NA->e->e_name.bv_val +
1437                                                             NA->e->e_name.bv_len - matched->bv_len;
1438                                                   if ( !eid[i]->isNULL() )
1439                                                             NA->e->e_id = eid[i]->u_64_value();
1440                                                   if ( !NA->ocs )
1441                                                             NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
1442                                                   break;
1443                                         }
1444                               }
1445                     }
1446                     return LDAP_NO_SUCH_OBJECT;
1447           default:
1448                     return LDAP_OTHER;
1449           }
1450 
1451           return 0;
1452 }
1453 
1454 extern "C" int
ndb_entry_del_info(BackendDB * be,NdbArgs * NA)1455 ndb_entry_del_info(
1456           BackendDB *be,
1457           NdbArgs *NA
1458 )
1459 {
1460           struct ndb_info *ni = (struct ndb_info *) be->be_private;
1461           const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1462           const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1463           NdbOperation *myop;
1464 
1465           myop = NA->txn->getNdbOperation( myTable );
1466           if ( !myop )
1467                     return LDAP_OTHER;
1468           if ( myop->deleteTuple())
1469                     return LDAP_OTHER;
1470 
1471           if ( ndb_rdns2keys( myop, NA->rdns ))
1472                     return LDAP_OTHER;
1473 
1474           return 0;
1475 }
1476 
1477 extern "C" int
ndb_next_id(BackendDB * be,Ndb * ndb,ID * id)1478 ndb_next_id(
1479           BackendDB *be,
1480           Ndb *ndb,
1481           ID *id
1482 )
1483 {
1484           struct ndb_info *ni = (struct ndb_info *) be->be_private;
1485           const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1486           const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1487           Uint64 nid = 0;
1488           int rc;
1489 
1490           if ( !myTable ) {
1491                     Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1492                               0, 0, 0 );
1493                     return LDAP_OTHER;
1494           }
1495 
1496           rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1497           if ( !rc )
1498                     *id = nid;
1499           return rc;
1500 }
1501 
1502 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1503 static void
ndb_thread_hfree(void * key,void * data)1504 ndb_thread_hfree( void *key, void *data )
1505 {
1506           Ndb *ndb = (Ndb *)data;
1507           delete ndb;
1508 }
1509 
1510 extern "C" int
ndb_thread_handle(Operation * op,Ndb ** ndb)1511 ndb_thread_handle(
1512           Operation *op,
1513           Ndb **ndb )
1514 {
1515           struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1516           void *data;
1517 
1518           if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1519                     Ndb *myNdb;
1520                     int rc;
1521                     ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1522                     myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1523                     if ( ni->ni_nextconn >= ni->ni_nconns )
1524                               ni->ni_nextconn = 0;
1525                     ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1526                     if ( !myNdb ) {
1527                               return LDAP_OTHER;
1528                     }
1529                     rc = myNdb->init(1024);
1530                     if ( rc ) {
1531                               delete myNdb;
1532                               Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1533                                         rc, 0, 0 );
1534                               return rc;
1535                     }
1536                     data = (void *)myNdb;
1537                     if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1538                               data, ndb_thread_hfree, NULL, NULL ))) {
1539                               delete myNdb;
1540                               Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1541                                         rc, 0, 0 );
1542                               return rc;
1543                     }
1544           }
1545           *ndb = (Ndb *)data;
1546           return 0;
1547 }
1548 
1549 extern "C" int
ndb_entry_get(Operation * op,struct berval * ndn,ObjectClass * oc,AttributeDescription * ad,int rw,Entry ** ent)1550 ndb_entry_get(
1551           Operation *op,
1552           struct berval *ndn,
1553           ObjectClass *oc,
1554           AttributeDescription *ad,
1555           int rw,
1556           Entry **ent )
1557 {
1558           struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1559           NdbArgs NA;
1560           Entry e = {0};
1561           int rc;
1562 
1563           /* Get our NDB handle */
1564           rc = ndb_thread_handle( op, &NA.ndb );
1565 
1566           NA.txn = NA.ndb->startTransaction();
1567           if( !NA.txn ) {
1568                     Debug( LDAP_DEBUG_TRACE,
1569                               LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1570                               NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1571                     return 1;
1572           }
1573 
1574           e.e_name = *ndn;
1575           NA.e = &e;
1576           /* get entry */
1577           {
1578                     NdbRdns rdns;
1579                     rdns.nr_num = 0;
1580                     NA.ocs = NULL;
1581                     NA.rdns = &rdns;
1582                     rc = ndb_entry_get_info( op, &NA, rw, NULL );
1583           }
1584           if ( rc == 0 ) {
1585                     e.e_name = *ndn;
1586                     e.e_nname = *ndn;
1587                     rc = ndb_entry_get_data( op, &NA, 0 );
1588                     ber_bvarray_free( NA.ocs );
1589                     if ( rc == 0 ) {
1590                               if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1591                                         attrs_free( e.e_attrs );
1592                                         rc = 1;
1593                               }
1594                     }
1595           }
1596           if ( rc == 0 ) {
1597                     *ent = entry_alloc();
1598                     **ent = e;
1599                     ber_dupbv( &(*ent)->e_name, ndn );
1600                     ber_dupbv( &(*ent)->e_nname, ndn );
1601           } else {
1602                     rc = 1;
1603           }
1604           NA.txn->close();
1605           return rc;
1606 }
1607 
1608 /* Congestion avoidance code
1609  * for Deadlock Rollback
1610  */
1611 
1612 extern "C" void
ndb_trans_backoff(int num_retries)1613 ndb_trans_backoff( int num_retries )
1614 {
1615           int i;
1616           int delay = 0;
1617           int pow_retries = 1;
1618           unsigned long key = 0;
1619           unsigned long max_key = -1;
1620           struct timeval timeout;
1621 
1622           lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1623 
1624           for ( i = 0; i < num_retries; i++ ) {
1625                     if ( i >= 5 ) break;
1626                     pow_retries *= 4;
1627           }
1628 
1629           delay = 16384 * (key * (double) pow_retries / (double) max_key);
1630           delay = delay ? delay : 1;
1631 
1632           Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1633 
1634           timeout.tv_sec = delay / 1000000;
1635           timeout.tv_usec = delay % 1000000;
1636           select( 0, NULL, NULL, NULL, &timeout );
1637 }
1638 
1639 extern "C" void
ndb_check_referral(Operation * op,SlapReply * rs,NdbArgs * NA)1640 ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
1641 {
1642           struct berval dn, ndn;
1643           int i, dif;
1644           dif = NA->erdns - NA->rdns->nr_num;
1645 
1646           /* Set full DN of matched into entry */
1647           for ( i=0; i<dif; i++ ) {
1648                     dnParent( &NA->e->e_name, &dn );
1649                     dnParent( &NA->e->e_nname, &ndn );
1650                     NA->e->e_name = dn;
1651                     NA->e->e_nname = ndn;
1652           }
1653 
1654           /* return referral only if "disclose" is granted on the object */
1655           if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
1656                     NULL, ACL_DISCLOSE, NULL )) {
1657                     Attribute a;
1658                     for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
1659                     a.a_numvals = i;
1660                     a.a_desc = slap_schema.si_ad_objectClass;
1661                     a.a_vals = NA->ocs;
1662                     a.a_nvals = NA->ocs;
1663                     a.a_next = NULL;
1664                     NA->e->e_attrs = &a;
1665                     if ( is_entry_referral( NA->e )) {
1666                               NA->e->e_attrs = NULL;
1667                               ndb_entry_get_data( op, NA, 0 );
1668                               rs->sr_ref = get_entry_referrals( op, NA->e );
1669                               if ( rs->sr_ref ) {
1670                                         rs->sr_err = LDAP_REFERRAL;
1671                                         rs->sr_flags |= REP_REF_MUSTBEFREED;
1672                               }
1673                               attrs_free( NA->e->e_attrs );
1674                     }
1675                     NA->e->e_attrs = NULL;
1676           }
1677 }
1678