lock.c 156 KB
Newer Older
1 2 3
/******************************************************************************
*******************************************************************************
**
4
**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
**
**  This copyrighted material is made available to anyone wishing to use,
**  modify, copy, or redistribute it subject to the terms and conditions
**  of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/

/* Central locking logic has four stages:

   dlm_lock()
   dlm_unlock()

   request_lock(ls, lkb)
   convert_lock(ls, lkb)
   unlock_lock(ls, lkb)
   cancel_lock(ls, lkb)

   _request_lock(r, lkb)
   _convert_lock(r, lkb)
   _unlock_lock(r, lkb)
   _cancel_lock(r, lkb)

   do_request(r, lkb)
   do_convert(r, lkb)
   do_unlock(r, lkb)
   do_cancel(r, lkb)

   Stage 1 (lock, unlock) is mainly about checking input args and
   splitting into one of the four main operations:

       dlm_lock          = request_lock
       dlm_lock+CONVERT  = convert_lock
       dlm_unlock        = unlock_lock
       dlm_unlock+CANCEL = cancel_lock

   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
   provided to the next stage.

   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
   When remote, it calls send_xxxx(), when local it calls do_xxxx().

   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
   given rsb and lkb and queues callbacks.

   For remote operations, send_xxxx() results in the corresponding do_xxxx()
   function being executed on the remote node.  The connecting send/receive
   calls on local (L) and remote (R) nodes:

   L: send_xxxx()              ->  R: receive_xxxx()
                                   R: do_xxxx()
   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
*/
David Teigland's avatar
David Teigland committed
58
#include <linux/types.h>
59
#include <linux/rbtree.h>
60
#include <linux/slab.h>
61
#include "dlm_internal.h"
David Teigland's avatar
David Teigland committed
62
#include <linux/dlm_device.h>
63 64 65 66 67 68 69 70 71 72 73 74
#include "memory.h"
#include "lowcomms.h"
#include "requestqueue.h"
#include "util.h"
#include "dir.h"
#include "member.h"
#include "lockspace.h"
#include "ast.h"
#include "lock.h"
#include "rcom.h"
#include "recover.h"
#include "lvb_table.h"
David Teigland's avatar
David Teigland committed
75
#include "user.h"
76 77 78 79 80 81 82 83 84 85 86
#include "config.h"

static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 89 90
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
				    struct dlm_message *ms);
static int receive_extralen(struct dlm_message *ms);
91
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92
static void del_timeout(struct dlm_lkb *lkb);
93
static void toss_rsb(struct kref *kref);
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161

/*
 * Lock compatibilty matrix - thanks Steve
 * UN = Unlocked state. Not really a state, used as a flag
 * PD = Padding. Used to make the matrix a nice power of two in size
 * Other states are the same as the VMS DLM.
 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
 */

static const int __dlm_compat_matrix[8][8] = {
      /* UN NL CR CW PR PW EX PD */
        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
};

/*
 * This defines the direction of transfer of LVB data.
 * Granted mode is the row; requested mode is the column.
 * Usage: matrix[grmode+1][rqmode+1]
 * 1 = LVB is returned to the caller
 * 0 = LVB is written to the resource
 * -1 = nothing happens to the LVB
 */

const int dlm_lvb_operations[8][8] = {
        /* UN   NL  CR  CW  PR  PW  EX  PD*/
        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
};

#define modes_compat(gr, rq) \
	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]

int dlm_modes_compat(int mode1, int mode2)
{
	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
}

/*
 * Compatibility matrix for conversions with QUECVT set.
 * Granted mode is the row; requested mode is the column.
 * Usage: matrix[grmode+1][rqmode+1]
 */

static const int __quecvt_compat_matrix[8][8] = {
      /* UN NL CR CW PR PW EX PD */
        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
};

David Teigland's avatar
David Teigland committed
162
void dlm_print_lkb(struct dlm_lkb *lkb)
163
{
164
	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
David Teigland's avatar
David Teigland committed
165
	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 167
	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
David Teigland's avatar
David Teigland committed
168 169
	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
	       (unsigned long long)lkb->lkb_recover_seq);
170 171
}

Adrian Bunk's avatar
Adrian Bunk committed
172
static void dlm_print_rsb(struct dlm_rsb *r)
173
{
174 175 176 177 178
	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
	       "rlc %d name %s\n",
	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
	       r->res_name);
179 180
}

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
void dlm_dump_rsb(struct dlm_rsb *r)
{
	struct dlm_lkb *lkb;

	dlm_print_rsb(r);

	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
	printk(KERN_ERR "rsb lookup list\n");
	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
		dlm_print_lkb(lkb);
	printk(KERN_ERR "rsb grant queue:\n");
	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
		dlm_print_lkb(lkb);
	printk(KERN_ERR "rsb convert queue:\n");
	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
		dlm_print_lkb(lkb);
	printk(KERN_ERR "rsb wait queue:\n");
	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
		dlm_print_lkb(lkb);
}

203 204
/* Threads cannot use the lockspace while it's being recovered */

205
static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 207 208 209
{
	down_read(&ls->ls_in_recovery);
}

210
void dlm_unlock_recovery(struct dlm_ls *ls)
211 212 213 214
{
	up_read(&ls->ls_in_recovery);
}

215
int dlm_lock_recovery_try(struct dlm_ls *ls)
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
{
	return down_read_trylock(&ls->ls_in_recovery);
}

static inline int can_be_queued(struct dlm_lkb *lkb)
{
	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
}

static inline int force_blocking_asts(struct dlm_lkb *lkb)
{
	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
}

static inline int is_demoted(struct dlm_lkb *lkb)
{
	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
}

David Teigland's avatar
David Teigland committed
235 236 237 238 239 240 241 242 243 244
static inline int is_altmode(struct dlm_lkb *lkb)
{
	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
}

static inline int is_granted(struct dlm_lkb *lkb)
{
	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
}

245 246 247 248 249 250 251 252 253 254 255 256 257
static inline int is_remote(struct dlm_rsb *r)
{
	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
	return !!r->res_nodeid;
}

static inline int is_process_copy(struct dlm_lkb *lkb)
{
	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
}

static inline int is_master_copy(struct dlm_lkb *lkb)
{
258
	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 260 261 262 263 264
}

static inline int middle_conversion(struct dlm_lkb *lkb)
{
	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 266
		return 1;
	return 0;
267 268 269 270 271 272 273
}

static inline int down_conversion(struct dlm_lkb *lkb)
{
	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
}

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
static inline int is_overlap_unlock(struct dlm_lkb *lkb)
{
	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
}

static inline int is_overlap_cancel(struct dlm_lkb *lkb)
{
	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
}

static inline int is_overlap(struct dlm_lkb *lkb)
{
	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
				  DLM_IFL_OVERLAP_CANCEL));
}

290 291 292 293 294
static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
{
	if (is_master_copy(lkb))
		return;

295 296
	del_timeout(lkb);

297 298
	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););

299 300 301 302 303 304 305
	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
	   timeout caused the cancel then return -ETIMEDOUT */
	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
		rv = -ETIMEDOUT;
	}

306 307 308 309 310
	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
		rv = -EDEADLK;
	}

311
	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 313
}

314 315 316 317 318 319
static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
	queue_cast(r, lkb,
		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
}

320 321
static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
{
322
	if (is_master_copy(lkb)) {
323
		send_bast(r, lkb, rqmode);
324
	} else {
325
		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326
	}
327 328 329 330 331 332
}

/*
 * Basic operations on rsb's and lkb's
 */

333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
/* This is only called to add a reference when the code already holds
   a valid reference to the rsb, so there's no need for locking. */

static inline void hold_rsb(struct dlm_rsb *r)
{
	kref_get(&r->res_ref);
}

void dlm_hold_rsb(struct dlm_rsb *r)
{
	hold_rsb(r);
}

/* When all references to the rsb are gone it's transferred to
   the tossed list for later disposal. */

static void put_rsb(struct dlm_rsb *r)
{
	struct dlm_ls *ls = r->res_ls;
	uint32_t bucket = r->res_bucket;

	spin_lock(&ls->ls_rsbtbl[bucket].lock);
	kref_put(&r->res_ref, toss_rsb);
	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
}

void dlm_put_rsb(struct dlm_rsb *r)
{
	put_rsb(r);
}

David Teigland's avatar
David Teigland committed
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
static int pre_rsb_struct(struct dlm_ls *ls)
{
	struct dlm_rsb *r1, *r2;
	int count = 0;

	spin_lock(&ls->ls_new_rsb_spin);
	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
		spin_unlock(&ls->ls_new_rsb_spin);
		return 0;
	}
	spin_unlock(&ls->ls_new_rsb_spin);

	r1 = dlm_allocate_rsb(ls);
	r2 = dlm_allocate_rsb(ls);

	spin_lock(&ls->ls_new_rsb_spin);
	if (r1) {
		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
		ls->ls_new_rsb_count++;
	}
	if (r2) {
		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
		ls->ls_new_rsb_count++;
	}
	count = ls->ls_new_rsb_count;
	spin_unlock(&ls->ls_new_rsb_spin);

	if (!count)
		return -ENOMEM;
	return 0;
}

/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
   unlock any spinlocks, go back and call pre_rsb_struct again.
   Otherwise, take an rsb off the list and return it. */

static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
			  struct dlm_rsb **r_ret)
402 403
{
	struct dlm_rsb *r;
David Teigland's avatar
David Teigland committed
404 405 406 407 408 409 410 411 412 413
	int count;

	spin_lock(&ls->ls_new_rsb_spin);
	if (list_empty(&ls->ls_new_rsb)) {
		count = ls->ls_new_rsb_count;
		spin_unlock(&ls->ls_new_rsb_spin);
		log_debug(ls, "find_rsb retry %d %d %s",
			  count, dlm_config.ci_new_rsb_count, name);
		return -EAGAIN;
	}
414

David Teigland's avatar
David Teigland committed
415 416
	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
	list_del(&r->res_hashchain);
417 418
	/* Convert the empty list_head to a NULL rb_node for tree usage: */
	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
David Teigland's avatar
David Teigland committed
419 420
	ls->ls_new_rsb_count--;
	spin_unlock(&ls->ls_new_rsb_spin);
421 422 423 424

	r->res_ls = ls;
	r->res_length = len;
	memcpy(r->res_name, name, len);
425
	mutex_init(&r->res_mutex);
426 427 428 429 430 431 432 433

	INIT_LIST_HEAD(&r->res_lookup);
	INIT_LIST_HEAD(&r->res_grantqueue);
	INIT_LIST_HEAD(&r->res_convertqueue);
	INIT_LIST_HEAD(&r->res_waitqueue);
	INIT_LIST_HEAD(&r->res_root_list);
	INIT_LIST_HEAD(&r->res_recover_list);

David Teigland's avatar
David Teigland committed
434 435
	*r_ret = r;
	return 0;
436 437
}

438 439 440 441 442 443 444 445 446
static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
{
	char maxname[DLM_RESNAME_MAXLEN];

	memset(maxname, 0, DLM_RESNAME_MAXLEN);
	memcpy(maxname, name, nlen);
	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
}

447
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448
			struct dlm_rsb **r_ret)
449
{
450
	struct rb_node *node = tree->rb_node;
451
	struct dlm_rsb *r;
452 453 454 455 456 457 458 459 460 461
	int rc;

	while (node) {
		r = rb_entry(node, struct dlm_rsb, res_hashnode);
		rc = rsb_cmp(r, name, len);
		if (rc < 0)
			node = node->rb_left;
		else if (rc > 0)
			node = node->rb_right;
		else
462 463
			goto found;
	}
464
	*r_ret = NULL;
David Teigland's avatar
David Teigland committed
465
	return -EBADR;
466 467 468

 found:
	*r_ret = r;
469
	return 0;
470 471
}

472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
{
	struct rb_node **newn = &tree->rb_node;
	struct rb_node *parent = NULL;
	int rc;

	while (*newn) {
		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
					       res_hashnode);

		parent = *newn;
		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
		if (rc < 0)
			newn = &parent->rb_left;
		else if (rc > 0)
			newn = &parent->rb_right;
		else {
			log_print("rsb_insert match");
			dlm_dump_rsb(rsb);
			dlm_dump_rsb(cur);
			return -EEXIST;
		}
	}

	rb_link_node(&rsb->res_hashnode, parent, newn);
	rb_insert_color(&rsb->res_hashnode, tree);
	return 0;
}

501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
/*
 * Find rsb in rsbtbl and potentially create/add one
 *
 * Delaying the release of rsb's has a similar benefit to applications keeping
 * NL locks on an rsb, but without the guarantee that the cached master value
 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 * to excessive master lookups and removals if we don't delay the release.
 *
 * Searching for an rsb means looking through both the normal list and toss
 * list.  When found on the toss list the rsb is moved to the normal list with
 * ref count of 1; when found on normal list the ref count is incremented.
 *
 * rsb's on the keep list are being used locally and refcounted.
 * rsb's on the toss list are not being used locally, and are not refcounted.
 *
 * The toss list rsb's were either
 * - previously used locally but not any more (were on keep list, then
 *   moved to toss list when last refcount dropped)
 * - created and put on toss list as a directory record for a lookup
 *   (we are the dir node for the res, but are not using the res right now,
 *   but some other node is)
 *
 * The purpose of find_rsb() is to return a refcounted rsb for local use.
 * So, if the given rsb is on the toss list, it is moved to the keep list
 * before being returned.
 *
 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
 * more refcounts exist, so the rsb is moved from the keep list to the
 * toss list.
 *
 * rsb's on both keep and toss lists are used for doing a name to master
 * lookups.  rsb's that are in use locally (and being refcounted) are on
 * the keep list, rsb's that are not in use locally (not refcounted) and
 * only exist for name/master lookups are on the toss list.
 *
 * rsb's on the toss list who's dir_nodeid is not local can have stale
 * name/master mappings.  So, remote requests on such rsb's can potentially
 * return with an error, which means the mapping is stale and needs to
 * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
 * first_lkid is to keep only a single outstanding request on an rsb
 * while that rsb has a potentially stale master.)
 */

static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
			uint32_t hash, uint32_t b,
			int dir_nodeid, int from_nodeid,
			unsigned int flags, struct dlm_rsb **r_ret)
549
{
550 551 552 553 554 555
	struct dlm_rsb *r = NULL;
	int our_nodeid = dlm_our_nodeid();
	int from_local = 0;
	int from_other = 0;
	int from_dir = 0;
	int create = 0;
556 557
	int error;

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
	if (flags & R_RECEIVE_REQUEST) {
		if (from_nodeid == dir_nodeid)
			from_dir = 1;
		else
			from_other = 1;
	} else if (flags & R_REQUEST) {
		from_local = 1;
	}

	/*
	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
	 * we're the new master.  Our local recovery may not have set
	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
	 * create the rsb; dlm_recover_process_copy() will handle EBADR
	 * by resending.
	 *
	 * If someone sends us a request, we are the dir node, and we do
	 * not find the rsb anywhere, then recreate it.  This happens if
	 * someone sends us a request after we have removed/freed an rsb
	 * from our toss list.  (They sent a request instead of lookup
	 * because they are using an rsb from their toss list.)
	 */

	if (from_local || from_dir ||
	    (from_other && (dir_nodeid == our_nodeid))) {
		create = 1;
585
	}
586

587 588 589 590 591 592 593 594 595 596
 retry:
	if (create) {
		error = pre_rsb_struct(ls);
		if (error < 0)
			goto out;
	}

	spin_lock(&ls->ls_rsbtbl[b].lock);

	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597
	if (error)
598 599 600 601 602
		goto do_toss;
	
	/*
	 * rsb is active, so we can't check master_nodeid without lock_rsb.
	 */
603

604 605 606 607 608 609 610
	kref_get(&r->res_ref);
	error = 0;
	goto out_unlock;


 do_toss:
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611
	if (error)
612
		goto do_new;
613

614 615 616 617 618 619
	/*
	 * rsb found inactive (master_nodeid may be out of date unless
	 * we are the dir_nodeid or were the master)  No other thread
	 * is using this rsb because it's on the toss list, so we can
	 * look at or update res_master_nodeid without lock_rsb.
	 */
620

621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
	if ((r->res_master_nodeid != our_nodeid) && from_other) {
		/* our rsb was not master, and another node (not the dir node)
		   has sent us a request */
		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
			  from_nodeid, r->res_master_nodeid, dir_nodeid,
			  r->res_name);
		error = -ENOTBLK;
		goto out_unlock;
	}

	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
		/* don't think this should ever happen */
		log_error(ls, "find_rsb toss from_dir %d master %d",
			  from_nodeid, r->res_master_nodeid);
		dlm_print_rsb(r);
		/* fix it and go on */
		r->res_master_nodeid = our_nodeid;
		r->res_nodeid = 0;
639 640
		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
		r->res_first_lkid = 0;
641 642 643 644 645
	}

	if (from_local && (r->res_master_nodeid != our_nodeid)) {
		/* Because we have held no locks on this rsb,
		   res_master_nodeid could have become stale. */
646 647
		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
		r->res_first_lkid = 0;
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
	}

	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
	goto out_unlock;


 do_new:
	/*
	 * rsb not found
	 */

	if (error == -EBADR && !create)
		goto out_unlock;

	error = get_rsb_struct(ls, name, len, &r);
	if (error == -EAGAIN) {
		spin_unlock(&ls->ls_rsbtbl[b].lock);
		goto retry;
	}
	if (error)
		goto out_unlock;

	r->res_hash = hash;
	r->res_bucket = b;
	r->res_dir_nodeid = dir_nodeid;
	kref_init(&r->res_ref);

	if (from_dir) {
		/* want to see how often this happens */
		log_debug(ls, "find_rsb new from_dir %d recreate %s",
			  from_nodeid, r->res_name);
		r->res_master_nodeid = our_nodeid;
		r->res_nodeid = 0;
		goto out_add;
	}

	if (from_other && (dir_nodeid != our_nodeid)) {
		/* should never happen */
		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
		dlm_free_rsb(r);
690
		r = NULL;
691 692 693 694 695 696 697 698 699 700 701 702 703 704
		error = -ENOTBLK;
		goto out_unlock;
	}

	if (from_other) {
		log_debug(ls, "find_rsb new from_other %d dir %d %s",
			  from_nodeid, dir_nodeid, r->res_name);
	}

	if (dir_nodeid == our_nodeid) {
		/* When we are the dir nodeid, we can set the master
		   node immediately */
		r->res_master_nodeid = our_nodeid;
		r->res_nodeid = 0;
705
	} else {
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
		/* set_master will send_lookup to dir_nodeid */
		r->res_master_nodeid = 0;
		r->res_nodeid = -1;
	}

 out_add:
	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 out_unlock:
	spin_unlock(&ls->ls_rsbtbl[b].lock);
 out:
	*r_ret = r;
	return error;
}

/* During recovery, other nodes can send us new MSTCPY locks (from
   dlm_recover_locks) before we've made ourself master (in
   dlm_recover_masters). */

static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
			  uint32_t hash, uint32_t b,
			  int dir_nodeid, int from_nodeid,
			  unsigned int flags, struct dlm_rsb **r_ret)
{
	struct dlm_rsb *r = NULL;
	int our_nodeid = dlm_our_nodeid();
	int recover = (flags & R_RECEIVE_RECOVER);
	int error;

 retry:
	error = pre_rsb_struct(ls);
	if (error < 0)
		goto out;

	spin_lock(&ls->ls_rsbtbl[b].lock);

	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
	if (error)
		goto do_toss;

	/*
	 * rsb is active, so we can't check master_nodeid without lock_rsb.
	 */

	kref_get(&r->res_ref);
	goto out_unlock;


 do_toss:
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
	if (error)
		goto do_new;

	/*
	 * rsb found inactive. No other thread is using this rsb because
	 * it's on the toss list, so we can look at or update
	 * res_master_nodeid without lock_rsb.
	 */

	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
		/* our rsb is not master, and another node has sent us a
		   request; this should never happen */
		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
			  from_nodeid, r->res_master_nodeid, dir_nodeid);
		dlm_print_rsb(r);
		error = -ENOTBLK;
		goto out_unlock;
772
	}
773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812

	if (!recover && (r->res_master_nodeid != our_nodeid) &&
	    (dir_nodeid == our_nodeid)) {
		/* our rsb is not master, and we are dir; may as well fix it;
		   this should never happen */
		log_error(ls, "find_rsb toss our %d master %d dir %d",
			  our_nodeid, r->res_master_nodeid, dir_nodeid);
		dlm_print_rsb(r);
		r->res_master_nodeid = our_nodeid;
		r->res_nodeid = 0;
	}

	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
	goto out_unlock;


 do_new:
	/*
	 * rsb not found
	 */

	error = get_rsb_struct(ls, name, len, &r);
	if (error == -EAGAIN) {
		spin_unlock(&ls->ls_rsbtbl[b].lock);
		goto retry;
	}
	if (error)
		goto out_unlock;

	r->res_hash = hash;
	r->res_bucket = b;
	r->res_dir_nodeid = dir_nodeid;
	r->res_master_nodeid = dir_nodeid;
	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
	kref_init(&r->res_ref);

	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
 out_unlock:
	spin_unlock(&ls->ls_rsbtbl[b].lock);
813 814 815 816 817
 out:
	*r_ret = r;
	return error;
}

818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
		    unsigned int flags, struct dlm_rsb **r_ret)
{
	uint32_t hash, b;
	int dir_nodeid;

	if (len > DLM_RESNAME_MAXLEN)
		return -EINVAL;

	hash = jhash(name, len, 0);
	b = hash & (ls->ls_rsbtbl_size - 1);

	dir_nodeid = dlm_hash2nodeid(ls, hash);

	if (dlm_no_directory(ls))
		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
				      from_nodeid, flags, r_ret);
	else
		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
				      from_nodeid, flags, r_ret);
}

/* we have received a request and found that res_master_nodeid != our_nodeid,
   so we need to return an error or make ourself the master */

static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
				  int from_nodeid)
{
	if (dlm_no_directory(ls)) {
		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
			  from_nodeid, r->res_master_nodeid,
			  r->res_dir_nodeid);
		dlm_print_rsb(r);
		return -ENOTBLK;
	}

	if (from_nodeid != r->res_dir_nodeid) {
		/* our rsb is not master, and another node (not the dir node)
	   	   has sent us a request.  this is much more common when our
	   	   master_nodeid is zero, so limit debug to non-zero.  */

		if (r->res_master_nodeid) {
			log_debug(ls, "validate master from_other %d master %d "
				  "dir %d first %x %s", from_nodeid,
				  r->res_master_nodeid, r->res_dir_nodeid,
				  r->res_first_lkid, r->res_name);
		}
		return -ENOTBLK;
	} else {
		/* our rsb is not master, but the dir nodeid has sent us a
	   	   request; this could happen with master 0 / res_nodeid -1 */

		if (r->res_master_nodeid) {
			log_error(ls, "validate master from_dir %d master %d "
				  "first %x %s",
				  from_nodeid, r->res_master_nodeid,
				  r->res_first_lkid, r->res_name);
		}

		r->res_master_nodeid = dlm_our_nodeid();
		r->res_nodeid = 0;
		return 0;
	}
}

883
/*
884 885 886 887
 * We're the dir node for this res and another node wants to know the
 * master nodeid.  During normal operation (non recovery) this is only
 * called from receive_lookup(); master lookups when the local node is
 * the dir node are done by find_rsb().
888
 *
889 890 891 892 893 894
 * normal operation, we are the dir node for a resource
 * . _request_lock
 * . set_master
 * . send_lookup
 * . receive_lookup
 * . dlm_master_lookup flags 0
895
 *
896 897 898 899 900 901 902 903 904 905 906 907 908 909
 * recover directory, we are rebuilding dir for all resources
 * . dlm_recover_directory
 * . dlm_rcom_names
 *   remote node sends back the rsb names it is master of and we are dir of
 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
 *   we either create new rsb setting remote node as master, or find existing
 *   rsb and set master to be the remote node.
 *
 * recover masters, we are finding the new master for resources
 * . dlm_recover_masters
 * . recover_master
 * . dlm_send_rcom_lookup
 * . receive_rcom_lookup
 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
910 911
 */

912 913
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
		      unsigned int flags, int *r_nodeid, int *result)
914
{
David Teigland's avatar
David Teigland committed
915
	struct dlm_rsb *r = NULL;
916 917 918 919 920
	uint32_t hash, b;
	int from_master = (flags & DLM_LU_RECOVER_DIR);
	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
	int our_nodeid = dlm_our_nodeid();
	int dir_nodeid, error, toss_list = 0;
921

922 923 924 925 926 927 928
	if (len > DLM_RESNAME_MAXLEN)
		return -EINVAL;

	if (from_nodeid == our_nodeid) {
		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
			  our_nodeid, flags);
		return -EINVAL;
David Teigland's avatar
David Teigland committed
929
	}
930

931 932
	hash = jhash(name, len, 0);
	b = hash & (ls->ls_rsbtbl_size - 1);
933

934 935 936 937 938 939 940 941
	dir_nodeid = dlm_hash2nodeid(ls, hash);
	if (dir_nodeid != our_nodeid) {
		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
			  from_nodeid, dir_nodeid, our_nodeid, hash,
			  ls->ls_num_nodes);
		*r_nodeid = -1;
		return -EINVAL;
	}
942

David Teigland's avatar
David Teigland committed
943
 retry:
944 945 946 947 948 949 950 951 952 953 954 955 956 957
	error = pre_rsb_struct(ls);
	if (error < 0)
		return error;

	spin_lock(&ls->ls_rsbtbl[b].lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
	if (!error) {
		/* because the rsb is active, we need to lock_rsb before
		   checking/changing re_master_nodeid */

		hold_rsb(r);
		spin_unlock(&ls->ls_rsbtbl[b].lock);
		lock_rsb(r);
		goto found;
David Teigland's avatar
David Teigland committed
958 959
	}

960 961 962
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
	if (error)
		goto not_found;
David Teigland's avatar
David Teigland committed
963

964 965
	/* because the rsb is inactive (on toss list), it's not refcounted
	   and lock_rsb is not used, but is protected by the rsbtbl lock */
966

967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991
	toss_list = 1;
 found:
	if (r->res_dir_nodeid != our_nodeid) {
		/* should not happen, but may as well fix it and carry on */
		log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
			  r->res_dir_nodeid, our_nodeid, r->res_name);
		r->res_dir_nodeid = our_nodeid;
	}

	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
		/* Recovery uses this function to set a new master when
		   the previous master failed.  Setting NEW_MASTER will
		   force dlm_recover_masters to call recover_master on this
		   rsb even though the res_nodeid is no longer removed. */

		r->res_master_nodeid = from_nodeid;
		r->res_nodeid = from_nodeid;
		rsb_set_flag(r, RSB_NEW_MASTER);

		if (toss_list) {
			/* I don't think we should ever find it on toss list. */
			log_error(ls, "dlm_master_lookup fix_master on toss");
			dlm_dump_rsb(r);
		}
	}
992

993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
	if (from_master && (r->res_master_nodeid != from_nodeid)) {
		/* this will happen if from_nodeid became master during
		   a previous recovery cycle, and we aborted the previous
		   cycle before recovering this master value */

		log_limit(ls, "dlm_master_lookup from_master %d "
			  "master_nodeid %d res_nodeid %d first %x %s",
			  from_nodeid, r->res_master_nodeid, r->res_nodeid,
			  r->res_first_lkid, r->res_name);

		if (r->res_master_nodeid == our_nodeid) {
			log_error(ls, "from_master %d our_master", from_nodeid);
			dlm_dump_rsb(r);
			dlm_send_rcom_lookup_dump(r, from_nodeid);
			goto out_found;
		}

		r->res_master_nodeid = from_nodeid;
		r->res_nodeid = from_nodeid;
		rsb_set_flag(r, RSB_NEW_MASTER);
	}

	if (!r->res_master_nodeid) {
		/* this will happen if recovery happens while we're looking
		   up the master for this rsb */

		log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
			  from_nodeid, r->res_first_lkid, r->res_name);
		r->res_master_nodeid = from_nodeid;
		r->res_nodeid = from_nodeid;
	}

	if (!from_master && !fix_master &&
	    (r->res_master_nodeid == from_nodeid)) {
		/* this can happen when the master sends remove, the dir node
		   finds the rsb on the keep list and ignores the remove,
		   and the former master sends a lookup */

		log_limit(ls, "dlm_master_lookup from master %d flags %x "
			  "first %x %s", from_nodeid, flags,
			  r->res_first_lkid, r->res_name);
	}

 out_found:
	*r_nodeid = r->res_master_nodeid;
	if (result)
		*result = DLM_LU_MATCH;

	if (toss_list) {
		r->res_toss_time = jiffies;
		/* the rsb was inactive (on toss list) */
		spin_unlock(&ls->ls_rsbtbl[b].lock);
	} else {
		/* the rsb was active */
		unlock_rsb(r);
		put_rsb(r);
	}
	return 0;
1051

1052 1053
 not_found:
	error = get_rsb_struct(ls, name, len, &r);
David Teigland's avatar
David Teigland committed
1054
	if (error == -EAGAIN) {
1055
		spin_unlock(&ls->ls_rsbtbl[b].lock);
David Teigland's avatar
David Teigland committed
1056 1057 1058 1059
		goto retry;
	}
	if (error)
		goto out_unlock;
1060 1061

	r->res_hash = hash;
1062 1063 1064 1065
	r->res_bucket = b;
	r->res_dir_nodeid = our_nodeid;
	r->res_master_nodeid = from_nodeid;
	r->res_nodeid = from_nodeid;
1066
	kref_init(&r->res_ref);
1067
	r->res_toss_time = jiffies;
1068

1069 1070 1071 1072 1073 1074
	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
	if (error) {
		/* should never happen */
		dlm_free_rsb(r);
		spin_unlock(&ls->ls_rsbtbl[b].lock);
		goto retry;
1075
	}
1076 1077 1078 1079 1080

	if (result)
		*result = DLM_LU_ADD;
	*r_nodeid = from_nodeid;
	error = 0;
David Teigland's avatar
David Teigland committed
1081
 out_unlock:
1082
	spin_unlock(&ls->ls_rsbtbl[b].lock);
1083 1084 1085
	return error;
}

1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
{
	struct rb_node *n;
	struct dlm_rsb *r;
	int i;

	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
		spin_lock(&ls->ls_rsbtbl[i].lock);
		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
			r = rb_entry(n, struct dlm_rsb, res_hashnode);
			if (r->res_hash == hash)
				dlm_dump_rsb(r);
		}
		spin_unlock(&ls->ls_rsbtbl[i].lock);
	}
}

1103
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1104
{
1105 1106 1107
	struct dlm_rsb *r = NULL;
	uint32_t hash, b;
	int error;
1108

1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
	hash = jhash(name, len, 0);
	b = hash & (ls->ls_rsbtbl_size - 1);

	spin_lock(&ls->ls_rsbtbl[b].lock);
	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
	if (!error)
		goto out_dump;

	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
	if (error)
		goto out;
 out_dump:
	dlm_dump_rsb(r);
 out:
	spin_unlock(&ls->ls_rsbtbl[b].lock);
1124 1125 1126 1127 1128 1129 1130 1131 1132
}

static void toss_rsb(struct kref *kref)
{
	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
	struct dlm_ls *ls = r->res_ls;

	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
	kref_init(&r->res_ref);
1133 1134
	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1135
	r->res_toss_time = jiffies;
1136
	ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1137
	if (r->res_lvbptr) {
1138
		dlm_free_lvb(r->res_lvbptr);
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
		r->res_lvbptr = NULL;
	}
}

/* See comment for unhold_lkb */

static void unhold_rsb(struct dlm_rsb *r)
{
	int rv;
	rv = kref_put(&r->res_ref, toss_rsb);
1149
	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1150 1151 1152 1153 1154 1155 1156 1157 1158
}

static void kill_rsb(struct kref *kref)
{
	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);

	/* All work is done after the return from kref_put() so we
	   can release the write_lock before the remove and free. */

1159 1160 1161 1162 1163 1164
	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
}

/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
   The rsb must exist as long as any lkb's for it do. */

static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
	hold_rsb(r);
	lkb->lkb_resource = r;
}

static void detach_lkb(struct dlm_lkb *lkb)
{
	if (lkb->lkb_resource) {
		put_rsb(lkb->lkb_resource);
		lkb->lkb_resource = NULL;
	}
}

static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
David Teigland's avatar
David Teigland committed
1186
	struct dlm_lkb *lkb;
Tejun Heo's avatar
Tejun Heo committed
1187
	int rv;
1188

1189
	lkb = dlm_allocate_lkb(ls);
1190 1191 1192 1193 1194 1195
	if (!lkb)
		return -ENOMEM;

	lkb->lkb_nodeid = -1;
	lkb->lkb_grmode = DLM_LOCK_IV;
	kref_init(&lkb->lkb_ref);
David Teigland's avatar
David Teigland committed
1196
	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1197
	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1198
	INIT_LIST_HEAD(&lkb->lkb_time_list);
1199 1200 1201
	INIT_LIST_HEAD(&lkb->lkb_cb_list);
	mutex_init(&lkb->lkb_cb_mutex);
	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202

Tejun Heo's avatar
Tejun Heo committed
1203
	idr_preload(GFP_NOFS);
David Teigland's avatar
David Teigland committed
1204
	spin_lock(&ls->ls_lkbidr_spin);
Tejun Heo's avatar
Tejun Heo committed
1205 1206 1207
	rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
	if (rv >= 0)
		lkb->lkb_id = rv;
David Teigland's avatar
David Teigland committed
1208
	spin_unlock(&ls->ls_lkbidr_spin);
Tejun Heo's avatar
Tejun Heo committed
1209
	idr_preload_end();
1210

David Teigland's avatar
David Teigland committed
1211 1212 1213
	if (rv < 0) {
		log_error(ls, "create_lkb idr error %d", rv);
		return rv;
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
	}

	*lkb_ret = lkb;
	return 0;
}

static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
	struct dlm_lkb *lkb;

David Teigland's avatar
David Teigland committed
1224 1225
	spin_lock(&ls->ls_lkbidr_spin);
	lkb = idr_find(&ls->ls_lkbidr, lkid);
1226 1227
	if (lkb)
		kref_get(&lkb->lkb_ref);
David Teigland's avatar
David Teigland committed
1228
	spin_unlock(&ls->ls_lkbidr_spin);
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243

	*lkb_ret = lkb;
	return lkb ? 0 : -ENOENT;
}

static void kill_lkb(struct kref *kref)
{
	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);

	/* All work is done after the return from kref_put() so we
	   can release the write_lock before the detach_lkb */

	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
}

1244 1245 1246 1247
/* __put_lkb() is used when an lkb may not have an rsb attached to
   it so we need to provide the lockspace explicitly */

static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1248
{
David Teigland's avatar
David Teigland committed
1249
	uint32_t lkid = lkb->lkb_id;
1250

David Teigland's avatar
David Teigland committed
1251
	spin_lock(&ls->ls_lkbidr_spin);
1252
	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
David Teigland's avatar
David Teigland committed
1253 1254
		idr_remove(&ls->ls_lkbidr, lkid);
		spin_unlock(&ls->ls_lkbidr_spin);
1255 1256 1257 1258 1259

		detach_lkb(lkb);

		/* for local/process lkbs, lvbptr points to caller's lksb */
		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1260 1261
			dlm_free_lvb(lkb->lkb_lvbptr);
		dlm_free_lkb(lkb);
1262 1263
		return 1;
	} else {
David Teigland's avatar
David Teigland committed
1264
		spin_unlock(&ls->ls_lkbidr_spin);
1265 1266 1267 1268 1269 1270
		return 0;
	}
}

int dlm_put_lkb(struct dlm_lkb *lkb)
{
1271 1272 1273 1274 1275 1276 1277
	struct dlm_ls *ls;

	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););

	ls = lkb->lkb_resource->res_ls;
	return __put_lkb(ls, lkb);
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
}

/* This is only called to add a reference when the code already holds
   a valid reference to the lkb, so there's no need for locking. */

static inline void hold_lkb(struct dlm_lkb *lkb)
{
	kref_get(&lkb->lkb_ref);
}

/* This is called when we need to remove a reference and are certain
   it's not the last ref.  e.g. del_lkb is always called between a
   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
   put_lkb would work fine, but would involve unnecessary locking */

static inline void unhold_lkb(struct dlm_lkb *lkb)
{
	int rv;
	rv = kref_put(&lkb->lkb_ref, kill_lkb);
	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
}

static void lkb_add_ordered(struct list_head *new, struct list_head *head,
			    int mode)
{
	struct dlm_lkb *lkb = NULL;

	list_for_each_entry(lkb, head, lkb_statequeue)
		if (lkb->lkb_rqmode < mode)
			break;

1309
	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319
}

/* add/remove lkb to rsb's grant/convert/wait queue */

static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
{
	kref_get(&lkb->lkb_ref);

	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););

1320 1321
	lkb->lkb_timestamp = ktime_get();

1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
	lkb->lkb_status = status;

	switch (status) {
	case DLM_LKSTS_WAITING:
		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
		else
			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
		break;
	case DLM_LKSTS_GRANTED:
		/* convention says granted locks kept in order of grmode */
		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
				lkb->lkb_grmode);
		break;
	case DLM_LKSTS_CONVERT:
		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
		else
			list_add_tail(&lkb->lkb_statequeue,
				      &r->res_convertqueue);
		break;
	default:
		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
	}
}

static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
	lkb->lkb_status = 0;
	list_del(&lkb->lkb_statequeue);
	unhold_lkb(lkb);
}

static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
{
	hold_lkb(lkb);
	del_lkb(r, lkb);
	add_lkb(r, lkb, sts);
	unhold_lkb(lkb);
}

1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
static int msg_reply_type(int mstype)
{
	switch (mstype) {
	case DLM_MSG_REQUEST:
		return DLM_MSG_REQUEST_REPLY;
	case DLM_MSG_CONVERT:
		return DLM_MSG_CONVERT_REPLY;
	case DLM_MSG_UNLOCK:
		return DLM_MSG_UNLOCK_REPLY;
	case DLM_MSG_CANCEL:
		return DLM_MSG_CANCEL_REPLY;
	case DLM_MSG_LOOKUP:
		return DLM_MSG_LOOKUP_REPLY;
	}
	return -1;
}

1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
static int nodeid_warned(int nodeid, int num_nodes, int *warned)
{
	int i;

	for (i = 0; i < num_nodes; i++) {
		if (!warned[i]) {
			warned[i] = nodeid;
			return 0;
		}
		if (warned[i] == nodeid)
			return 1;
	}
	return 0;
}

void dlm_scan_waiters(struct dlm_ls *ls)
{
	struct dlm_lkb *lkb;
	ktime_t zero = ktime_set(0, 0);
	s64 us;
	s64 debug_maxus = 0;
	u32 debug_scanned = 0;
	u32 debug_expired = 0;
	int num_nodes = 0;
	int *warned = NULL;

	if (!dlm_config.ci_waitwarn_us)
		return;

	mutex_lock(&ls->ls_waiters_mutex);

	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
		if (ktime_equal(lkb->lkb_wait_time, zero))
			continue;

		debug_scanned++;

		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));

		if (us < dlm_config.ci_waitwarn_us)
			continue;

		lkb->lkb_wait_time = zero;

		debug_expired++;
		if (us > debug_maxus)
			debug_maxus = us;

		if (!num_nodes) {
			num_nodes = ls->ls_num_nodes;
1430
			warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
		}
		if (!warned)
			continue;
		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
			continue;

		log_error(ls, "waitwarn %x %lld %d us check connection to "
			  "node %d", lkb->lkb_id, (long long)us,
			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
	}
	mutex_unlock(&ls->ls_waiters_mutex);
1442
	kfree(warned);
1443 1444 1445 1446 1447 1448 1449

	if (debug_expired)
		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
			  debug_scanned, debug_expired,
			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
}

1450 1451 1452
/* add/remove lkb from global waiters list of lkb's waiting for
   a reply from a remote node */

1453
static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1454 1455
{
	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1456
	int error = 0;
1457

1458
	mutex_lock(&ls->ls_waiters_mutex);
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480

	if (is_overlap_unlock(lkb) ||
	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
		error = -EINVAL;
		goto out;
	}

	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
		switch (mstype) {
		case DLM_MSG_UNLOCK:
			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
			break;
		case DLM_MSG_CANCEL:
			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
			break;
		default:
			error = -EBUSY;
			goto out;
		}
		lkb->lkb_wait_count++;
		hold_lkb(lkb);

1481
		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1482 1483
			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
			  lkb->lkb_wait_count, lkb->lkb_flags);
1484 1485
		goto out;
	}
1486 1487 1488 1489 1490 1491

	DLM_ASSERT(!lkb->lkb_wait_count,
		   dlm_print_lkb(lkb);
		   printk("wait_count %d\n", lkb->lkb_wait_count););

	lkb->lkb_wait_count++;
1492
	lkb->lkb_wait_type = mstype;
1493 1494
	lkb->lkb_wait_time = ktime_get();
	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1495
	hold_lkb(lkb);
1496 1497
	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 out:
1498
	if (error)
1499
		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1500 1501
			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1502
	mutex_unlock(&ls->ls_waiters_mutex);
1503
	return error;
1504 1505
}

1506 1507 1508 1509 1510
/* We clear the RESEND flag because we might be taking an lkb off the waiters
   list as part of process_requestqueue (e.g. a lookup that has an optimized
   request reply on the requestqueue) between dlm_recover_waiters_pre() which
   set RESEND and dlm_recover_waiters_post() */

1511 1512
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
				struct dlm_message *ms)
1513
{
1514 1515
	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
	int overlap_done = 0;
1516

1517
	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1518
		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1519 1520 1521
		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
		overlap_done = 1;
		goto out_del;
1522
	}
1523 1524

	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1525
		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1526 1527 1528 1529 1530
		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
		overlap_done = 1;
		goto out_del;
	}

1531 1532 1533 1534 1535 1536 1537 1538 1539