--- a/lock/lock_deadlock.c +++ b/lock/lock_deadlock.c @@ -121,7 +121,7 @@ __lock_detect(env, atype, rejectp) DB_LOCKTAB *lt; db_timespec now; locker_info *idmap; - u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap; + u_int32_t *bitmap, *copymap, **deadp, **deadlist, *tmpmap; u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers; u_int32_t lock_max, txn_max; int ret, status; @@ -133,7 +133,8 @@ __lock_detect(env, atype, rejectp) if (IS_REP_CLIENT(env)) atype = DB_LOCK_MINWRITE; - free_me = NULL; + copymap = tmpmap = NULL; + deadlist = NULL; lt = env->lk_handle; if (rejectp != NULL) @@ -179,11 +180,11 @@ __lock_detect(env, atype, rejectp) memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc); if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0) - goto err1; + goto err; /* Find a deadlock. */ if ((ret = - __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadp)) != 0) + __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadlist)) != 0) return (ret); /* @@ -204,8 +205,7 @@ __lock_detect(env, atype, rejectp) txn_max = TXN_MAXIMUM; killid = BAD_KILLID; - free_me = deadp; - for (; *deadp != NULL; deadp++) { + for (deadp = deadlist; *deadp != NULL; deadp++) { if (rejectp != NULL) ++*rejectp; killid = (u_int32_t)(*deadp - bitmap) / nalloc; @@ -342,11 +342,12 @@ dokill: if (killid == BAD_KILLID) { __db_msg(env, "Aborting locker %lx", (u_long)idmap[killid].id); } - __os_free(env, tmpmap); -err1: __os_free(env, copymap); - -err: if (free_me != NULL) - __os_free(env, free_me); +err: if(copymap != NULL) + __os_free(env, copymap); + if (deadlist != NULL) + __os_free(env, deadlist); + if(tmpmap != NULL) + __os_free(env, tmpmap); __os_free(env, bitmap); __os_free(env, idmap); @@ -360,6 +361,17 @@ err: if (free_me != NULL) #define DD_INVALID_ID ((u_int32_t) -1) +/* + * __dd_build -- + * Build the lock dependency bit maps. + * Notes on synchronization: + * LOCK_SYSTEM_LOCK is used to hold objects locked when we have + * a single partition. + * LOCK_LOCKERS is held while we are walking the lockers list and + * to single thread the use of lockerp->dd_id. + * LOCK_DD protects the DD list of objects. + */ + static int __dd_build(env, atype, bmp, nlockers, allocp, idmap, rejectp) ENV *env; @@ -393,6 +405,7 @@ __dd_build(env, atype, bmp, nlockers, al * In particular we do not build the conflict array and our caller * needs to expect this. */ + LOCK_SYSTEM_LOCK(lt, region); if (atype == DB_LOCK_EXPIRE) { skip: LOCK_DD(env, region); op = SH_TAILQ_FIRST(®ion->dd_objs, __db_lockobj); @@ -430,17 +443,18 @@ skip: LOCK_DD(env, region); OBJECT_UNLOCK(lt, region, indx); } UNLOCK_DD(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); goto done; } /* - * We'll check how many lockers there are, add a few more in for - * good measure and then allocate all the structures. Then we'll - * verify that we have enough room when we go back in and get the - * mutex the second time. + * Allocate after locking the region + * to make sure the structures are large enough. */ -retry: count = region->stat.st_nlockers; + LOCK_LOCKERS(env, region); + count = region->stat.st_nlockers; if (count == 0) { + UNLOCK_LOCKERS(env, region); *nlockers = 0; return (0); } @@ -448,50 +462,37 @@ retry: count = region->stat.st_nlockers; if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK)) __db_msg(env, "%lu lockers", (u_long)count); - count += 20; nentries = (u_int32_t)DB_ALIGN(count, 32) / 32; - /* - * Allocate enough space for a count by count bitmap matrix. - * - * XXX - * We can probably save the malloc's between iterations just - * reallocing if necessary because count grew by too much. - */ + /* Allocate enough space for a count by count bitmap matrix. */ if ((ret = __os_calloc(env, (size_t)count, - sizeof(u_int32_t) * nentries, &bitmap)) != 0) + sizeof(u_int32_t) * nentries, &bitmap)) != 0) { + UNLOCK_LOCKERS(env, region); return (ret); + } if ((ret = __os_calloc(env, sizeof(u_int32_t), nentries, &tmpmap)) != 0) { + UNLOCK_LOCKERS(env, region); __os_free(env, bitmap); return (ret); } if ((ret = __os_calloc(env, (size_t)count, sizeof(locker_info), &id_array)) != 0) { + UNLOCK_LOCKERS(env, region); __os_free(env, bitmap); __os_free(env, tmpmap); return (ret); } /* - * Now go back in and actually fill in the matrix. - */ - if (region->stat.st_nlockers > count) { - __os_free(env, bitmap); - __os_free(env, tmpmap); - __os_free(env, id_array); - goto retry; - } - - /* * First we go through and assign each locker a deadlock detector id. */ id = 0; - LOCK_LOCKERS(env, region); SH_TAILQ_FOREACH(lip, ®ion->lockers, ulinks, __db_locker) { if (lip->master_locker == INVALID_ROFF) { + DB_ASSERT(env, id < count); lip->dd_id = id++; id_array[lip->dd_id].id = lip->id; switch (atype) { @@ -510,7 +511,6 @@ retry: count = region->stat.st_nlockers; lip->dd_id = DD_INVALID_ID; } - UNLOCK_LOCKERS(env, region); /* * We only need consider objects that have waiters, so we use @@ -669,7 +669,6 @@ again: memset(bitmap, 0, count * sizeof * status after building the bit maps so that we will not detect * a blocked transaction without noting that it is already aborting. */ - LOCK_LOCKERS(env, region); for (id = 0; id < count; id++) { if (!id_array[id].valid) continue; @@ -738,6 +737,7 @@ get_lock: id_array[id].last_lock = R_OF id_array[id].in_abort = 1; } UNLOCK_LOCKERS(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); /* * Now we can release everything except the bitmap matrix that we @@ -839,6 +839,7 @@ __dd_abort(env, info, statusp) ret = 0; /* We must lock so this locker cannot go away while we abort it. */ + LOCK_SYSTEM_LOCK(lt, region); LOCK_LOCKERS(env, region); /* @@ -895,6 +896,7 @@ __dd_abort(env, info, statusp) done: OBJECT_UNLOCK(lt, region, info->last_ndx); err: out: UNLOCK_LOCKERS(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); return (ret); }