diff --git a/block/blk-core.c b/block/blk-core.c
index a2005a485335b5b42082bf02ffd7b3d1e90b3f3c..d0d104268f1a99bed64cd7d7871122a3dda243ec 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -145,6 +145,7 @@ static const struct {
 	[BLK_STS_MEDIUM]	= { -ENODATA,	"critical medium" },
 	[BLK_STS_PROTECTION]	= { -EILSEQ,	"protection" },
 	[BLK_STS_RESOURCE]	= { -ENOMEM,	"kernel resource" },
+	[BLK_STS_DEV_RESOURCE]	= { -EBUSY,	"device resource" },
 	[BLK_STS_AGAIN]		= { -EAGAIN,	"nonblocking retry" },
 
 	/* device mapper special case, should not leak out: */
@@ -3282,6 +3283,8 @@ void blk_rq_bio_prep(struct request_queue *q, struct request *rq,
 {
 	if (bio_has_data(bio))
 		rq->nr_phys_segments = bio_phys_segments(q, bio);
+	else if (bio_op(bio) == REQ_OP_DISCARD)
+		rq->nr_phys_segments = 1;
 
 	rq->__data_len = bio->bi_iter.bi_size;
 	rq->bio = rq->biotail = bio;
diff --git a/block/blk-merge.c b/block/blk-merge.c
index 8452fc7164ccbb607af3807fb3bf5d0eb227e57b..782940c65d8a7c44ff24248f6e280e82b0a5bc9e 100644
--- a/block/blk-merge.c
+++ b/block/blk-merge.c
@@ -550,6 +550,24 @@ static bool req_no_special_merge(struct request *req)
 	return !q->mq_ops && req->special;
 }
 
+static bool req_attempt_discard_merge(struct request_queue *q, struct request *req,
+		struct request *next)
+{
+	unsigned short segments = blk_rq_nr_discard_segments(req);
+
+	if (segments >= queue_max_discard_segments(q))
+		goto no_merge;
+	if (blk_rq_sectors(req) + bio_sectors(next->bio) >
+	    blk_rq_get_max_sectors(req, blk_rq_pos(req)))
+		goto no_merge;
+
+	req->nr_phys_segments = segments + blk_rq_nr_discard_segments(next);
+	return true;
+no_merge:
+	req_set_nomerge(q, req);
+	return false;
+}
+
 static int ll_merge_requests_fn(struct request_queue *q, struct request *req,
 				struct request *next)
 {
@@ -683,9 +701,13 @@ static struct request *attempt_merge(struct request_queue *q,
 	 * If we are allowed to merge, then append bio list
 	 * from next to rq and release next. merge_requests_fn
 	 * will have updated segment counts, update sector
-	 * counts here.
+	 * counts here. Handle DISCARDs separately, as they
+	 * have separate settings.
 	 */
-	if (!ll_merge_requests_fn(q, req, next))
+	if (req_op(req) == REQ_OP_DISCARD) {
+		if (!req_attempt_discard_merge(q, req, next))
+			return NULL;
+	} else if (!ll_merge_requests_fn(q, req, next))
 		return NULL;
 
 	/*
@@ -715,7 +737,8 @@ static struct request *attempt_merge(struct request_queue *q,
 
 	req->__data_len += blk_rq_bytes(next);
 
-	elv_merge_requests(q, req, next);
+	if (req_op(req) != REQ_OP_DISCARD)
+		elv_merge_requests(q, req, next);
 
 	/*
 	 * 'next' is going away, so update stats accordingly
diff --git a/block/blk-mq-sched.c b/block/blk-mq-sched.c
index 55c0a745b4277ac86caa1b8eac68d48ccf6d9be9..25c14c58385c86ff3f5b33135d77fb3984eff185 100644
--- a/block/blk-mq-sched.c
+++ b/block/blk-mq-sched.c
@@ -259,6 +259,8 @@ bool blk_mq_sched_try_merge(struct request_queue *q, struct bio *bio,
 		if (!*merged_request)
 			elv_merged_request(q, rq, ELEVATOR_FRONT_MERGE);
 		return true;
+	case ELEVATOR_DISCARD_MERGE:
+		return bio_attempt_discard_merge(q, rq, bio);
 	default:
 		return false;
 	}
diff --git a/block/blk-mq.c b/block/blk-mq.c
index 01f271d40825ebfd6ca82fdd2f887d432946799c..df93102e21494dc7f98456e4376ce7c83fca56ab 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -1162,6 +1162,8 @@ static bool blk_mq_mark_tag_wait(struct blk_mq_hw_ctx **hctx,
 	return true;
 }
 
+#define BLK_MQ_RESOURCE_DELAY	3		/* ms units */
+
 bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 			     bool got_budget)
 {
@@ -1169,6 +1171,7 @@ bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 	struct request *rq, *nxt;
 	bool no_tag = false;
 	int errors, queued;
+	blk_status_t ret = BLK_STS_OK;
 
 	if (list_empty(list))
 		return false;
@@ -1181,7 +1184,6 @@ bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 	errors = queued = 0;
 	do {
 		struct blk_mq_queue_data bd;
-		blk_status_t ret;
 
 		rq = list_first_entry(list, struct request, queuelist);
 		if (!blk_mq_get_driver_tag(rq, &hctx, false)) {
@@ -1226,7 +1228,7 @@ bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 		}
 
 		ret = q->mq_ops->queue_rq(hctx, &bd);
-		if (ret == BLK_STS_RESOURCE) {
+		if (ret == BLK_STS_RESOURCE || ret == BLK_STS_DEV_RESOURCE) {
 			/*
 			 * If an I/O scheduler has been configured and we got a
 			 * driver tag for the next request already, free it
@@ -1257,6 +1259,8 @@ bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 	 * that is where we will continue on next queue run.
 	 */
 	if (!list_empty(list)) {
+		bool needs_restart;
+
 		spin_lock(&hctx->lock);
 		list_splice_init(list, &hctx->dispatch);
 		spin_unlock(&hctx->lock);
@@ -1280,10 +1284,17 @@ bool blk_mq_dispatch_rq_list(struct request_queue *q, struct list_head *list,
 		 * - Some but not all block drivers stop a queue before
 		 *   returning BLK_STS_RESOURCE. Two exceptions are scsi-mq
 		 *   and dm-rq.
+		 *
+		 * If driver returns BLK_STS_RESOURCE and SCHED_RESTART
+		 * bit is set, run queue after a delay to avoid IO stalls
+		 * that could otherwise occur if the queue is idle.
 		 */
-		if (!blk_mq_sched_needs_restart(hctx) ||
+		needs_restart = blk_mq_sched_needs_restart(hctx);
+		if (!needs_restart ||
 		    (no_tag && list_empty_careful(&hctx->dispatch_wait.entry)))
 			blk_mq_run_hw_queue(hctx, true);
+		else if (needs_restart && (ret == BLK_STS_RESOURCE))
+			blk_mq_delay_run_hw_queue(hctx, BLK_MQ_RESOURCE_DELAY);
 	}
 
 	return (queued + errors) != 0;
@@ -1764,6 +1775,7 @@ static blk_status_t __blk_mq_issue_directly(struct blk_mq_hw_ctx *hctx,
 		*cookie = new_cookie;
 		break;
 	case BLK_STS_RESOURCE:
+	case BLK_STS_DEV_RESOURCE:
 		__blk_mq_requeue_request(rq);
 		break;
 	default:
@@ -1826,7 +1838,7 @@ static void blk_mq_try_issue_directly(struct blk_mq_hw_ctx *hctx,
 	hctx_lock(hctx, &srcu_idx);
 
 	ret = __blk_mq_try_issue_directly(hctx, rq, cookie, false);
-	if (ret == BLK_STS_RESOURCE)
+	if (ret == BLK_STS_RESOURCE || ret == BLK_STS_DEV_RESOURCE)
 		blk_mq_sched_insert_request(rq, false, true, false);
 	else if (ret != BLK_STS_OK)
 		blk_mq_end_request(rq, ret);
diff --git a/drivers/block/null_blk.c b/drivers/block/null_blk.c
index 6655893a3a7a8365a5feb4f035b65021d38f3847..287a09611c0f8addd756c41d4733b6773f25badc 100644
--- a/drivers/block/null_blk.c
+++ b/drivers/block/null_blk.c
@@ -1230,7 +1230,7 @@ static blk_status_t null_handle_cmd(struct nullb_cmd *cmd)
 				return BLK_STS_OK;
 			} else
 				/* requeue request */
-				return BLK_STS_RESOURCE;
+				return BLK_STS_DEV_RESOURCE;
 		}
 	}
 
diff --git a/drivers/block/skd_main.c b/drivers/block/skd_main.c
index de0d08133c7ee071e0ce44167fda6e8a0e1d9c51..e41935ab41ef168bee120ec5c3a16241c9317b73 100644
--- a/drivers/block/skd_main.c
+++ b/drivers/block/skd_main.c
@@ -32,7 +32,6 @@
 #include <linux/aer.h>
 #include <linux/wait.h>
 #include <linux/stringify.h>
-#include <linux/slab_def.h>
 #include <scsi/scsi.h>
 #include <scsi/sg.h>
 #include <linux/io.h>
@@ -2603,7 +2602,8 @@ static void *skd_alloc_dma(struct skd_device *skdev, struct kmem_cache *s,
 	buf = kmem_cache_alloc(s, gfp);
 	if (!buf)
 		return NULL;
-	*dma_handle = dma_map_single(dev, buf, s->size, dir);
+	*dma_handle = dma_map_single(dev, buf,
+				     kmem_cache_size(s), dir);
 	if (dma_mapping_error(dev, *dma_handle)) {
 		kmem_cache_free(s, buf);
 		buf = NULL;
@@ -2618,7 +2618,8 @@ static void skd_free_dma(struct skd_device *skdev, struct kmem_cache *s,
 	if (!vaddr)
 		return;
 
-	dma_unmap_single(&skdev->pdev->dev, dma_handle, s->size, dir);
+	dma_unmap_single(&skdev->pdev->dev, dma_handle,
+			 kmem_cache_size(s), dir);
 	kmem_cache_free(s, vaddr);
 }
 
diff --git a/drivers/block/virtio_blk.c b/drivers/block/virtio_blk.c
index 68846897d2139baa1208619469a4ef15385d7aa8..79908e6ddbf2605d2a0ee67e721ffd6048e75aa7 100644
--- a/drivers/block/virtio_blk.c
+++ b/drivers/block/virtio_blk.c
@@ -276,7 +276,7 @@ static blk_status_t virtio_queue_rq(struct blk_mq_hw_ctx *hctx,
 		/* Out of mem doesn't actually happen, since we fall back
 		 * to direct descriptors */
 		if (err == -ENOMEM || err == -ENOSPC)
-			return BLK_STS_RESOURCE;
+			return BLK_STS_DEV_RESOURCE;
 		return BLK_STS_IOERR;
 	}
 
diff --git a/drivers/block/xen-blkfront.c b/drivers/block/xen-blkfront.c
index 891265acb10ec3c0af6a2ddc9e76c14ccc70cfb2..e126e4cac2ca499566da91a6e3da01d0b1e4381e 100644
--- a/drivers/block/xen-blkfront.c
+++ b/drivers/block/xen-blkfront.c
@@ -911,7 +911,7 @@ static blk_status_t blkif_queue_rq(struct blk_mq_hw_ctx *hctx,
 out_busy:
 	blk_mq_stop_hw_queue(hctx);
 	spin_unlock_irqrestore(&rinfo->ring_lock, flags);
-	return BLK_STS_RESOURCE;
+	return BLK_STS_DEV_RESOURCE;
 }
 
 static void blkif_complete_rq(struct request *rq)
diff --git a/drivers/md/dm-rq.c b/drivers/md/dm-rq.c
index aeaaaef43effda534a338f28d8ca660513994e21..bf0b840645cc8b64c522e99265881ddaf98ed93e 100644
--- a/drivers/md/dm-rq.c
+++ b/drivers/md/dm-rq.c
@@ -408,7 +408,7 @@ static blk_status_t dm_dispatch_clone_request(struct request *clone, struct requ
 
 	clone->start_time = jiffies;
 	r = blk_insert_cloned_request(clone->q, clone);
-	if (r != BLK_STS_OK && r != BLK_STS_RESOURCE)
+	if (r != BLK_STS_OK && r != BLK_STS_RESOURCE && r != BLK_STS_DEV_RESOURCE)
 		/* must complete clone in terms of original request */
 		dm_complete_request(rq, r);
 	return r;
@@ -500,7 +500,7 @@ static int map_request(struct dm_rq_target_io *tio)
 		trace_block_rq_remap(clone->q, clone, disk_devt(dm_disk(md)),
 				     blk_rq_pos(rq));
 		ret = dm_dispatch_clone_request(clone, rq);
-		if (ret == BLK_STS_RESOURCE) {
+		if (ret == BLK_STS_RESOURCE || ret == BLK_STS_DEV_RESOURCE) {
 			blk_rq_unprep_clone(clone);
 			tio->ti->type->release_clone_rq(clone);
 			tio->clone = NULL;
@@ -772,7 +772,6 @@ static blk_status_t dm_mq_queue_rq(struct blk_mq_hw_ctx *hctx,
 		/* Undo dm_start_request() before requeuing */
 		rq_end_stats(md, rq);
 		rq_completed(md, rq_data_dir(rq), false);
-		blk_mq_delay_run_hw_queue(hctx, 100/*ms*/);
 		return BLK_STS_RESOURCE;
 	}
 
diff --git a/drivers/nvme/host/fc.c b/drivers/nvme/host/fc.c
index 99bf51c7e51325e25ead4f4bb0dfbeff7972e526..b856d7c919d298062e2e55d8495ca18891d4f0f2 100644
--- a/drivers/nvme/host/fc.c
+++ b/drivers/nvme/host/fc.c
@@ -35,8 +35,6 @@ enum nvme_fc_queue_flags {
 	NVME_FC_Q_LIVE,
 };
 
-#define NVMEFC_QUEUE_DELAY	3		/* ms units */
-
 #define NVME_FC_DEFAULT_DEV_LOSS_TMO	60	/* seconds */
 
 struct nvme_fc_queue {
@@ -2231,7 +2229,7 @@ nvme_fc_start_fcp_op(struct nvme_fc_ctrl *ctrl, struct nvme_fc_queue *queue,
 	 * the target device is present
 	 */
 	if (ctrl->rport->remoteport.port_state != FC_OBJSTATE_ONLINE)
-		goto busy;
+		return BLK_STS_RESOURCE;
 
 	if (!nvme_fc_ctrl_get(ctrl))
 		return BLK_STS_IOERR;
@@ -2311,16 +2309,10 @@ nvme_fc_start_fcp_op(struct nvme_fc_ctrl *ctrl, struct nvme_fc_queue *queue,
 				ret != -EBUSY)
 			return BLK_STS_IOERR;
 
-		goto busy;
+		return BLK_STS_RESOURCE;
 	}
 
 	return BLK_STS_OK;
-
-busy:
-	if (!(op->flags & FCOP_FLAGS_AEN) && queue->hctx)
-		blk_mq_delay_run_hw_queue(queue->hctx, NVMEFC_QUEUE_DELAY);
-
-	return BLK_STS_RESOURCE;
 }
 
 static inline blk_status_t nvme_fc_is_ready(struct nvme_fc_queue *queue,
diff --git a/drivers/scsi/scsi_lib.c b/drivers/scsi/scsi_lib.c
index 8efe4731ed8986fc4c8d72d4513aa64324afd3c1..a86df9ca7d1c88aceb1d1e2298f48fbdb8bb3c49 100644
--- a/drivers/scsi/scsi_lib.c
+++ b/drivers/scsi/scsi_lib.c
@@ -2047,9 +2047,9 @@ static blk_status_t scsi_queue_rq(struct blk_mq_hw_ctx *hctx,
 	case BLK_STS_OK:
 		break;
 	case BLK_STS_RESOURCE:
-		if (atomic_read(&sdev->device_busy) == 0 &&
-		    !scsi_device_blocked(sdev))
-			blk_mq_delay_run_hw_queue(hctx, SCSI_QUEUE_DELAY);
+		if (atomic_read(&sdev->device_busy) ||
+		    scsi_device_blocked(sdev))
+			ret = BLK_STS_DEV_RESOURCE;
 		break;
 	default:
 		/*
diff --git a/include/linux/blk_types.h b/include/linux/blk_types.h
index c5d3db0d83f8ac1adf177f0f92c7bf3ed0e2c261..bf18b95ed92d566466709b95d826fa64fb640fb8 100644
--- a/include/linux/blk_types.h
+++ b/include/linux/blk_types.h
@@ -39,6 +39,24 @@ typedef u8 __bitwise blk_status_t;
 
 #define BLK_STS_AGAIN		((__force blk_status_t)12)
 
+/*
+ * BLK_STS_DEV_RESOURCE is returned from the driver to the block layer if
+ * device related resources are unavailable, but the driver can guarantee
+ * that the queue will be rerun in the future once resources become
+ * available again. This is typically the case for device specific
+ * resources that are consumed for IO. If the driver fails allocating these
+ * resources, we know that inflight (or pending) IO will free these
+ * resource upon completion.
+ *
+ * This is different from BLK_STS_RESOURCE in that it explicitly references
+ * a device specific resource. For resources of wider scope, allocation
+ * failure can happen without having pending IO. This means that we can't
+ * rely on request completions freeing these resources, as IO may not be in
+ * flight. Examples of that are kernel memory allocations, DMA mappings, or
+ * any other system wide resources.
+ */
+#define BLK_STS_DEV_RESOURCE	((__force blk_status_t)13)
+
 /**
  * blk_path_error - returns true if error may be path related
  * @error: status the request was completed with
diff --git a/include/linux/buffer_head.h b/include/linux/buffer_head.h
index 58a82f58e44e851e00c1413d7f66d8a931915a21..894e5d125de67fb030a59396685b803cb66a753e 100644
--- a/include/linux/buffer_head.h
+++ b/include/linux/buffer_head.h
@@ -81,11 +81,14 @@ struct buffer_head {
 /*
  * macro tricks to expand the set_buffer_foo(), clear_buffer_foo()
  * and buffer_foo() functions.
+ * To avoid reset buffer flags that are already set, because that causes
+ * a costly cache line transition, check the flag first.
  */
 #define BUFFER_FNS(bit, name)						\
 static __always_inline void set_buffer_##name(struct buffer_head *bh)	\
 {									\
-	set_bit(BH_##bit, &(bh)->b_state);				\
+	if (!test_bit(BH_##bit, &(bh)->b_state))			\
+		set_bit(BH_##bit, &(bh)->b_state);			\
 }									\
 static __always_inline void clear_buffer_##name(struct buffer_head *bh)	\
 {									\