diff --git a/fs/iomap/buffered-io.c b/fs/iomap/buffered-io.c
index c938bbad075e1ff72631ff4e93728d395f237126..6c51a75d0be612b71f755a15118556b865eefa94 100644
--- a/fs/iomap/buffered-io.c
+++ b/fs/iomap/buffered-io.c
@@ -21,6 +21,8 @@
 
 #include "../internal.h"
 
+#define IOEND_BATCH_SIZE	4096
+
 /*
  * Structure allocated for each folio when block size < folio size
  * to track sub-folio uptodate status and I/O completions.
@@ -1039,7 +1041,7 @@ static void iomap_finish_folio_write(struct inode *inode, struct folio *folio,
  * state, release holds on bios, and finally free up memory.  Do not use the
  * ioend after this.
  */
-static void
+static u32
 iomap_finish_ioend(struct iomap_ioend *ioend, int error)
 {
 	struct inode *inode = ioend->io_inode;
@@ -1048,6 +1050,7 @@ iomap_finish_ioend(struct iomap_ioend *ioend, int error)
 	u64 start = bio->bi_iter.bi_sector;
 	loff_t offset = ioend->io_offset;
 	bool quiet = bio_flagged(bio, BIO_QUIET);
+	u32 folio_count = 0;
 
 	for (bio = &ioend->io_inline_bio; bio; bio = next) {
 		struct folio_iter fi;
@@ -1062,9 +1065,11 @@ iomap_finish_ioend(struct iomap_ioend *ioend, int error)
 			next = bio->bi_private;
 
 		/* walk all folios in bio, ending page IO on them */
-		bio_for_each_folio_all(fi, bio)
+		bio_for_each_folio_all(fi, bio) {
 			iomap_finish_folio_write(inode, fi.folio, fi.length,
 					error);
+			folio_count++;
+		}
 		bio_put(bio);
 	}
 	/* The ioend has been freed by bio_put() */
@@ -1074,20 +1079,36 @@ iomap_finish_ioend(struct iomap_ioend *ioend, int error)
 "%s: writeback error on inode %lu, offset %lld, sector %llu",
 			inode->i_sb->s_id, inode->i_ino, offset, start);
 	}
+	return folio_count;
 }
 
+/*
+ * Ioend completion routine for merged bios. This can only be called from task
+ * contexts as merged ioends can be of unbound length. Hence we have to break up
+ * the writeback completions into manageable chunks to avoid long scheduler
+ * holdoffs. We aim to keep scheduler holdoffs down below 10ms so that we get
+ * good batch processing throughput without creating adverse scheduler latency
+ * conditions.
+ */
 void
 iomap_finish_ioends(struct iomap_ioend *ioend, int error)
 {
 	struct list_head tmp;
+	u32 completions;
+
+	might_sleep();
 
 	list_replace_init(&ioend->io_list, &tmp);
-	iomap_finish_ioend(ioend, error);
+	completions = iomap_finish_ioend(ioend, error);
 
 	while (!list_empty(&tmp)) {
+		if (completions > IOEND_BATCH_SIZE * 8) {
+			cond_resched();
+			completions = 0;
+		}
 		ioend = list_first_entry(&tmp, struct iomap_ioend, io_list);
 		list_del_init(&ioend->io_list);
-		iomap_finish_ioend(ioend, error);
+		completions += iomap_finish_ioend(ioend, error);
 	}
 }
 EXPORT_SYMBOL_GPL(iomap_finish_ioends);
@@ -1108,6 +1129,18 @@ iomap_ioend_can_merge(struct iomap_ioend *ioend, struct iomap_ioend *next)
 		return false;
 	if (ioend->io_offset + ioend->io_size != next->io_offset)
 		return false;
+	/*
+	 * Do not merge physically discontiguous ioends. The filesystem
+	 * completion functions will have to iterate the physical
+	 * discontiguities even if we merge the ioends at a logical level, so
+	 * we don't gain anything by merging physical discontiguities here.
+	 *
+	 * We cannot use bio->bi_iter.bi_sector here as it is modified during
+	 * submission so does not point to the start sector of the bio at
+	 * completion.
+	 */
+	if (ioend->io_sector + (ioend->io_size >> 9) != next->io_sector)
+		return false;
 	return true;
 }
 
@@ -1209,8 +1242,10 @@ iomap_alloc_ioend(struct inode *inode, struct iomap_writepage_ctx *wpc,
 	ioend->io_flags = wpc->iomap.flags;
 	ioend->io_inode = inode;
 	ioend->io_size = 0;
+	ioend->io_folios = 0;
 	ioend->io_offset = offset;
 	ioend->io_bio = bio;
+	ioend->io_sector = sector;
 	return ioend;
 }
 
@@ -1251,6 +1286,13 @@ iomap_can_add_to_ioend(struct iomap_writepage_ctx *wpc, loff_t offset,
 		return false;
 	if (sector != bio_end_sector(wpc->ioend->io_bio))
 		return false;
+	/*
+	 * Limit ioend bio chain lengths to minimise IO completion latency. This
+	 * also prevents long tight loops ending page writeback on all the
+	 * folios in the ioend.
+	 */
+	if (wpc->ioend->io_folios >= IOEND_BATCH_SIZE)
+		return false;
 	return true;
 }
 
@@ -1335,6 +1377,8 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 				 &submit_list);
 		count++;
 	}
+	if (count)
+		wpc->ioend->io_folios++;
 
 	WARN_ON_ONCE(!wpc->ioend && !list_empty(&submit_list));
 	WARN_ON_ONCE(!folio_test_locked(folio));
diff --git a/fs/xfs/xfs_aops.c b/fs/xfs/xfs_aops.c
index 2705f91bdd0d7cdf1508ab98bf934e0e854b8e40..9d6a67c7d2271a49cb0783101e5ef643babffc56 100644
--- a/fs/xfs/xfs_aops.c
+++ b/fs/xfs/xfs_aops.c
@@ -136,7 +136,20 @@ xfs_end_ioend(
 	memalloc_nofs_restore(nofs_flag);
 }
 
-/* Finish all pending io completions. */
+/*
+ * Finish all pending IO completions that require transactional modifications.
+ *
+ * We try to merge physical and logically contiguous ioends before completion to
+ * minimise the number of transactions we need to perform during IO completion.
+ * Both unwritten extent conversion and COW remapping need to iterate and modify
+ * one physical extent at a time, so we gain nothing by merging physically
+ * discontiguous extents here.
+ *
+ * The ioend chain length that we can be processing here is largely unbound in
+ * length and we may have to perform significant amounts of work on each ioend
+ * to complete it. Hence we have to be careful about holding the CPU for too
+ * long in this loop.
+ */
 void
 xfs_end_io(
 	struct work_struct	*work)
@@ -157,6 +170,7 @@ xfs_end_io(
 		list_del_init(&ioend->io_list);
 		iomap_ioend_try_merge(ioend, &tmp);
 		xfs_end_ioend(ioend);
+		cond_resched();
 	}
 }
 
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index b55bd49e55f51e6e6084fc2710330994e6d20213..97a3a2edb58505bc37fe587aa46f456c16c3e46b 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -263,9 +263,11 @@ struct iomap_ioend {
 	struct list_head	io_list;	/* next ioend in chain */
 	u16			io_type;
 	u16			io_flags;	/* IOMAP_F_* */
+	u32			io_folios;	/* folios added to ioend */
 	struct inode		*io_inode;	/* file being written to */
 	size_t			io_size;	/* size of the extent */
 	loff_t			io_offset;	/* offset in the file */
+	sector_t		io_sector;	/* start sector of ioend */
 	struct bio		*io_bio;	/* bio being built */
 	struct bio		io_inline_bio;	/* MUST BE LAST! */
 };