diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 6894976b54e376da3c203932ec2c1152ddfc527d..7a5040d8614ceab93f526d7e72342e3b0e66a2ac 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -31,16 +31,25 @@ #include <asm/errno.h> #endif +/* entries must start with the following structure */ +struct plist { + struct plist *next; + struct plist *last; /* only valid in the 1st entry */ +}; + struct ptr_ring { int producer ____cacheline_aligned_in_smp; spinlock_t producer_lock; int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ int consumer_tail; /* next entry to invalidate */ + struct plist *consumer_list; + int list_num; spinlock_t consumer_lock; /* Shared consumer/producer data */ /* Read-only by both the producer and the consumer */ int size ____cacheline_aligned_in_smp; /* max entries in queue */ int batch; /* number of entries to consume in a batch */ + int list_size; void **queue; }; @@ -121,10 +130,42 @@ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) } /* - * Note: resize (below) nests producer lock within consumer lock, so if you - * consume in interrupt or BH context, you must disable interrupts/BH when - * calling this. + * Note: resize API with the _fallback should be used when calling this. */ +static inline int ptr_ring_produce_fallback(struct ptr_ring *r, void *ptr) +{ + int ret; + unsigned long flags; + struct plist *p = ptr; + + p->next = NULL; + p->last = p; + + spin_lock_irqsave(&r->producer_lock, flags); + ret = __ptr_ring_produce(r, ptr); + if (ret && r->list_size) { + spin_lock(&r->consumer_lock); + ret = __ptr_ring_produce(r, ptr); + if (ret && r->list_num < r->list_size) { + int producer = r->producer ? r->producer - 1 : + r->size - 1; + struct plist *first = r->queue[producer]; + + BUG_ON(!first); + + first->last->next = p; + first->last = p; + + r->list_num++; + } + spin_unlock(&r->consumer_lock); + } + + spin_unlock_irqrestore(&r->producer_lock, flags); + + return ret; +} + static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) { int ret; @@ -136,6 +177,7 @@ static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) return ret; } + static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) { int ret; @@ -373,6 +415,27 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r) return ptr; } +static inline void *ptr_ring_consume_fallback(struct ptr_ring *r) +{ + unsigned long flags; + struct plist *ptr; + + spin_lock_irqsave(&r->consumer_lock, flags); + if (r->consumer_list) { + ptr = r->consumer_list; + r->consumer_list = ptr->next; + r->list_num--; + } else { + ptr = __ptr_ring_consume(r); + if (ptr) { + r->consumer_list = ptr->next; + } + } + spin_unlock_irqrestore(&r->consumer_lock, flags); + + return ptr; +} + static inline int ptr_ring_consume_batched(struct ptr_ring *r, void **array, int n) { @@ -488,7 +551,8 @@ static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) r->batch = 1; } -static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) +static inline int ptr_ring_init_fallback(struct ptr_ring *r, int size, gfp_t gfp, + int fallback_size) { r->queue = __ptr_ring_init_queue_alloc(size, gfp); if (!r->queue) @@ -498,10 +562,17 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) r->producer = r->consumer_head = r->consumer_tail = 0; spin_lock_init(&r->producer_lock); spin_lock_init(&r->consumer_lock); + r->list_size = fallback_size; + r->consumer_list = NULL; return 0; } +static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) +{ + return ptr_ring_init_fallback(r, size, gfp, 0); +} + /* * Return entries into ring. Destroy entries that don't fit. *