File: /usr/src/linux/net/sunrpc/sched.c

1     /*
2      * linux/net/sunrpc/sched.c
3      *
4      * Scheduling for synchronous and asynchronous RPC requests.
5      *
6      * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7      * 
8      * TCP NFS related read + write fixes
9      * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10      */
11     
12     #include <linux/module.h>
13     
14     #define __KERNEL_SYSCALLS__
15     #include <linux/sched.h>
16     #include <linux/interrupt.h>
17     #include <linux/slab.h>
18     #include <linux/unistd.h>
19     #include <linux/smp.h>
20     #include <linux/smp_lock.h>
21     #include <linux/spinlock.h>
22     
23     #include <linux/sunrpc/clnt.h>
24     
25     #ifdef RPC_DEBUG
26     #define RPCDBG_FACILITY		RPCDBG_SCHED
27     static int			rpc_task_id;
28     #endif
29     
30     /*
31      * We give RPC the same get_free_pages priority as NFS
32      */
33     #define GFP_RPC			GFP_NFS
34     
35     static void			__rpc_default_timer(struct rpc_task *task);
36     static void			rpciod_killall(void);
37     
38     /*
39      * When an asynchronous RPC task is activated within a bottom half
40      * handler, or while executing another RPC task, it is put on
41      * schedq, and rpciod is woken up.
42      */
43     static struct rpc_wait_queue	schedq = RPC_INIT_WAITQ("schedq");
44     
45     /*
46      * RPC tasks that create another task (e.g. for contacting the portmapper)
47      * will wait on this queue for their child's completion
48      */
49     static struct rpc_wait_queue	childq = RPC_INIT_WAITQ("childq");
50     
51     /*
52      * RPC tasks sit here while waiting for conditions to improve.
53      */
54     static struct rpc_wait_queue	delay_queue = RPC_INIT_WAITQ("delayq");
55     
56     /*
57      * All RPC tasks are linked into this list
58      */
59     static struct rpc_task *	all_tasks;
60     
61     /*
62      * rpciod-related stuff
63      */
64     static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
65     static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer);
66     static DECLARE_MUTEX(rpciod_sema);
67     static unsigned int		rpciod_users;
68     static pid_t			rpciod_pid;
69     static int			rpc_inhibit;
70     
71     /*
72      * Spinlock for wait queues. Access to the latter also has to be
73      * interrupt-safe in order to allow timers to wake up sleeping tasks.
74      */
75     spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
76     /*
77      * Spinlock for other critical sections of code.
78      */
79     static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
80     
81     /*
82      * This is the last-ditch buffer for NFS swap requests
83      */
84     static u32			swap_buffer[PAGE_SIZE >> 2];
85     static long			swap_buffer_used;
86     
87     /*
88      * Make allocation of the swap_buffer SMP-safe
89      */
90     static __inline__ int rpc_lock_swapbuf(void)
91     {
92     	return !test_and_set_bit(1, &swap_buffer_used);
93     }
94     static __inline__ void rpc_unlock_swapbuf(void)
95     {
96     	clear_bit(1, &swap_buffer_used);
97     }
98     
99     /*
100      * Disable the timer for a given RPC task. Should be called with
101      * rpc_queue_lock and bh_disabled in order to avoid races within
102      * rpc_run_timer().
103      */
104     static inline void
105     __rpc_disable_timer(struct rpc_task *task)
106     {
107     	dprintk("RPC: %4d disabling timer\n", task->tk_pid);
108     	task->tk_timeout_fn = NULL;
109     	task->tk_timeout = 0;
110     }
111     
112     /*
113      * Run a timeout function.
114      * We use the callback in order to allow __rpc_wake_up_task()
115      * and friends to disable the timer synchronously on SMP systems
116      * without calling del_timer_sync(). The latter could cause a
117      * deadlock if called while we're holding spinlocks...
118      */
119     static void
120     rpc_run_timer(struct rpc_task *task)
121     {
122     	void (*callback)(struct rpc_task *);
123     
124     	spin_lock_bh(&rpc_queue_lock);
125     	callback = task->tk_timeout_fn;
126     	task->tk_timeout_fn = NULL;
127     	spin_unlock_bh(&rpc_queue_lock);
128     	if (callback) {
129     		dprintk("RPC: %4d running timer\n", task->tk_pid);
130     		callback(task);
131     	}
132     }
133     
134     /*
135      * Set up a timer for the current task.
136      */
137     static inline void
138     __rpc_add_timer(struct rpc_task *task, rpc_action timer)
139     {
140     	if (!task->tk_timeout)
141     		return;
142     
143     	dprintk("RPC: %4d setting alarm for %lu ms\n",
144     			task->tk_pid, task->tk_timeout * 1000 / HZ);
145     
146     	if (timer)
147     		task->tk_timeout_fn = timer;
148     	else
149     		task->tk_timeout_fn = __rpc_default_timer;
150     	mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
151     }
152     
153     /*
154      * Set up a timer for an already sleeping task.
155      */
156     void rpc_add_timer(struct rpc_task *task, rpc_action timer)
157     {
158     	spin_lock_bh(&rpc_queue_lock);
159     	if (!(RPC_IS_RUNNING(task) || task->tk_wakeup))
160     		__rpc_add_timer(task, timer);
161     	spin_unlock_bh(&rpc_queue_lock);
162     }
163     
164     /*
165      * Delete any timer for the current task. Because we use del_timer_sync(),
166      * this function should never be called while holding rpc_queue_lock.
167      */
168     static inline void
169     rpc_delete_timer(struct rpc_task *task)
170     {
171     	if (timer_pending(&task->tk_timer)) {
172     		dprintk("RPC: %4d deleting timer\n", task->tk_pid);
173     		del_timer_sync(&task->tk_timer);
174     	}
175     }
176     
177     /*
178      * Add new request to wait queue.
179      *
180      * Swapper tasks always get inserted at the head of the queue.
181      * This should avoid many nasty memory deadlocks and hopefully
182      * improve overall performance.
183      * Everyone else gets appended to the queue to ensure proper FIFO behavior.
184      */
185     static inline int
186     __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
187     {
188     	if (task->tk_rpcwait == queue)
189     		return 0;
190     
191     	if (task->tk_rpcwait) {
192     		printk(KERN_WARNING "RPC: doubly enqueued task!\n");
193     		return -EWOULDBLOCK;
194     	}
195     	if (RPC_IS_SWAPPER(task))
196     		rpc_insert_list(&queue->task, task);
197     	else
198     		rpc_append_list(&queue->task, task);
199     	task->tk_rpcwait = queue;
200     
201     	dprintk("RPC: %4d added to queue %p \"%s\"\n",
202     				task->tk_pid, queue, rpc_qname(queue));
203     
204     	return 0;
205     }
206     
207     int
208     rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
209     {
210     	int		result;
211     
212     	spin_lock_bh(&rpc_queue_lock);
213     	result = __rpc_add_wait_queue(q, task);
214     	spin_unlock_bh(&rpc_queue_lock);
215     	return result;
216     }
217     
218     /*
219      * Remove request from queue.
220      * Note: must be called with spin lock held.
221      */
222     static inline void
223     __rpc_remove_wait_queue(struct rpc_task *task)
224     {
225     	struct rpc_wait_queue *queue = task->tk_rpcwait;
226     
227     	if (!queue)
228     		return;
229     
230     	rpc_remove_list(&queue->task, task);
231     	task->tk_rpcwait = NULL;
232     
233     	dprintk("RPC: %4d removed from queue %p \"%s\"\n",
234     				task->tk_pid, queue, rpc_qname(queue));
235     }
236     
237     void
238     rpc_remove_wait_queue(struct rpc_task *task)
239     {
240     	if (!task->tk_rpcwait)
241     		return;
242     	spin_lock_bh(&rpc_queue_lock);
243     	__rpc_remove_wait_queue(task);
244     	spin_unlock_bh(&rpc_queue_lock);
245     }
246     
247     /*
248      * Make an RPC task runnable.
249      *
250      * Note: If the task is ASYNC, this must be called with 
251      * the spinlock held to protect the wait queue operation.
252      */
253     static inline void
254     rpc_make_runnable(struct rpc_task *task)
255     {
256     	if (task->tk_timeout_fn) {
257     		printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
258     		return;
259     	}
260     	rpc_set_running(task);
261     	if (RPC_IS_ASYNC(task)) {
262     		if (RPC_IS_SLEEPING(task)) {
263     			int status;
264     			status = __rpc_add_wait_queue(&schedq, task);
265     			if (status < 0) {
266     				printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
267     				task->tk_status = status;
268     				return;
269     			}
270     			rpc_clear_sleeping(task);
271     			if (waitqueue_active(&rpciod_idle))
272     				wake_up(&rpciod_idle);
273     		}
274     	} else {
275     		rpc_clear_sleeping(task);
276     		if (waitqueue_active(&task->tk_wait))
277     			wake_up(&task->tk_wait);
278     	}
279     }
280     
281     /*
282      * Place a newly initialized task on the schedq.
283      */
284     static inline void
285     rpc_schedule_run(struct rpc_task *task)
286     {
287     	/* Don't run a child twice! */
288     	if (RPC_IS_ACTIVATED(task))
289     		return;
290     	task->tk_active = 1;
291     	rpc_set_sleeping(task);
292     	rpc_make_runnable(task);
293     }
294     
295     /*
296      *	For other people who may need to wake the I/O daemon
297      *	but should (for now) know nothing about its innards
298      */
299     void rpciod_wake_up(void)
300     {
301     	if(rpciod_pid==0)
302     		printk(KERN_ERR "rpciod: wot no daemon?\n");
303     	if (waitqueue_active(&rpciod_idle))
304     		wake_up(&rpciod_idle);
305     }
306     
307     /*
308      * Prepare for sleeping on a wait queue.
309      * By always appending tasks to the list we ensure FIFO behavior.
310      * NB: An RPC task will only receive interrupt-driven events as long
311      * as it's on a wait queue.
312      */
313     static void
314     __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
315     			rpc_action action, rpc_action timer)
316     {
317     	int status;
318     
319     	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
320     				rpc_qname(q), jiffies);
321     
322     	if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
323     		printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
324     		return;
325     	}
326     
327     	/* Mark the task as being activated if so needed */
328     	if (!RPC_IS_ACTIVATED(task)) {
329     		task->tk_active = 1;
330     		rpc_set_sleeping(task);
331     	}
332     
333     	status = __rpc_add_wait_queue(q, task);
334     	if (status) {
335     		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
336     		task->tk_status = status;
337     	} else {
338     		rpc_clear_running(task);
339     		if (task->tk_callback) {
340     			dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
341     			BUG();
342     		}
343     		task->tk_callback = action;
344     		__rpc_add_timer(task, timer);
345     	}
346     }
347     
348     void
349     rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
350     				rpc_action action, rpc_action timer)
351     {
352     	/*
353     	 * Protect the queue operations.
354     	 */
355     	spin_lock_bh(&rpc_queue_lock);
356     	__rpc_sleep_on(q, task, action, timer);
357     	spin_unlock_bh(&rpc_queue_lock);
358     }
359     
360     void
361     rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task,
362     		 rpc_action action, rpc_action timer)
363     {
364     	/*
365     	 * Protect the queue operations.
366     	 */
367     	spin_lock_bh(&rpc_queue_lock);
368     	__rpc_sleep_on(q, task, action, timer);
369     	__rpc_lock_task(task);
370     	spin_unlock_bh(&rpc_queue_lock);
371     }
372     
373     /**
374      * __rpc_wake_up_task - wake up a single rpc_task
375      * @task: task to be woken up
376      *
377      * If the task is locked, it is merely removed from the queue, and
378      * 'task->tk_wakeup' is set. rpc_unlock_task() will then ensure
379      * that it is woken up as soon as the lock count goes to zero.
380      *
381      * Caller must hold rpc_queue_lock
382      */
383     static void
384     __rpc_wake_up_task(struct rpc_task *task)
385     {
386     	dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
387     					task->tk_pid, jiffies, rpc_inhibit);
388     
389     #ifdef RPC_DEBUG
390     	if (task->tk_magic != 0xf00baa) {
391     		printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
392     		rpc_debug = ~0;
393     		rpc_show_tasks();
394     		return;
395     	}
396     #endif
397     	/* Has the task been executed yet? If not, we cannot wake it up! */
398     	if (!RPC_IS_ACTIVATED(task)) {
399     		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
400     		return;
401     	}
402     	if (RPC_IS_RUNNING(task))
403     		return;
404     
405     	__rpc_disable_timer(task);
406     	if (task->tk_rpcwait != &schedq)
407     		__rpc_remove_wait_queue(task);
408     
409     	/* If the task has been locked, then set tk_wakeup so that
410     	 * rpc_unlock_task() wakes us up... */
411     	if (task->tk_lock) {
412     		task->tk_wakeup = 1;
413     		return;
414     	} else
415     		task->tk_wakeup = 0;
416     
417     	rpc_make_runnable(task);
418     
419     	dprintk("RPC:      __rpc_wake_up_task done\n");
420     }
421     
422     /*
423      * Default timeout handler if none specified by user
424      */
425     static void
426     __rpc_default_timer(struct rpc_task *task)
427     {
428     	dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
429     	task->tk_status = -ETIMEDOUT;
430     	rpc_wake_up_task(task);
431     }
432     
433     /*
434      * Wake up the specified task
435      */
436     void
437     rpc_wake_up_task(struct rpc_task *task)
438     {
439     	if (RPC_IS_RUNNING(task))
440     		return;
441     	spin_lock_bh(&rpc_queue_lock);
442     	__rpc_wake_up_task(task);
443     	spin_unlock_bh(&rpc_queue_lock);
444     }
445     
446     /*
447      * Wake up the next task on the wait queue.
448      */
449     struct rpc_task *
450     rpc_wake_up_next(struct rpc_wait_queue *queue)
451     {
452     	struct rpc_task	*task;
453     
454     	dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
455     	spin_lock_bh(&rpc_queue_lock);
456     	if ((task = queue->task) != 0)
457     		__rpc_wake_up_task(task);
458     	spin_unlock_bh(&rpc_queue_lock);
459     
460     	return task;
461     }
462     
463     /**
464      * rpc_wake_up - wake up all rpc_tasks
465      * @queue: rpc_wait_queue on which the tasks are sleeping
466      *
467      * Grabs rpc_queue_lock
468      */
469     void
470     rpc_wake_up(struct rpc_wait_queue *queue)
471     {
472     	spin_lock_bh(&rpc_queue_lock);
473     	while (queue->task)
474     		__rpc_wake_up_task(queue->task);
475     	spin_unlock_bh(&rpc_queue_lock);
476     }
477     
478     /**
479      * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
480      * @queue: rpc_wait_queue on which the tasks are sleeping
481      * @status: status value to set
482      *
483      * Grabs rpc_queue_lock
484      */
485     void
486     rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
487     {
488     	struct rpc_task	*task;
489     
490     	spin_lock_bh(&rpc_queue_lock);
491     	while ((task = queue->task) != NULL) {
492     		task->tk_status = status;
493     		__rpc_wake_up_task(task);
494     	}
495     	spin_unlock_bh(&rpc_queue_lock);
496     }
497     
498     /*
499      * Lock down a sleeping task to prevent it from waking up
500      * and disappearing from beneath us.
501      *
502      * This function should always be called with the
503      * rpc_queue_lock held.
504      */
505     int
506     __rpc_lock_task(struct rpc_task *task)
507     {
508     	if (!RPC_IS_RUNNING(task))
509     		return ++task->tk_lock;
510     	return 0;
511     }
512     
513     void
514     rpc_unlock_task(struct rpc_task *task)
515     {
516     	spin_lock_bh(&rpc_queue_lock);
517     	if (task->tk_lock && !--task->tk_lock && task->tk_wakeup)
518     		__rpc_wake_up_task(task);
519     	spin_unlock_bh(&rpc_queue_lock);
520     }
521     
522     /*
523      * Run a task at a later time
524      */
525     static void	__rpc_atrun(struct rpc_task *);
526     void
527     rpc_delay(struct rpc_task *task, unsigned long delay)
528     {
529     	task->tk_timeout = delay;
530     	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
531     }
532     
533     static void
534     __rpc_atrun(struct rpc_task *task)
535     {
536     	task->tk_status = 0;
537     	rpc_wake_up_task(task);
538     }
539     
540     /*
541      * This is the RPC `scheduler' (or rather, the finite state machine).
542      */
543     static int
544     __rpc_execute(struct rpc_task *task)
545     {
546     	int		status = 0;
547     
548     	dprintk("RPC: %4d rpc_execute flgs %x\n",
549     				task->tk_pid, task->tk_flags);
550     
551     	if (!RPC_IS_RUNNING(task)) {
552     		printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
553     		return 0;
554     	}
555     
556      restarted:
557     	while (1) {
558     		/*
559     		 * Execute any pending callback.
560     		 */
561     		if (RPC_DO_CALLBACK(task)) {
562     			/* Define a callback save pointer */
563     			void (*save_callback)(struct rpc_task *);
564     	
565     			/* 
566     			 * If a callback exists, save it, reset it,
567     			 * call it.
568     			 * The save is needed to stop from resetting
569     			 * another callback set within the callback handler
570     			 * - Dave
571     			 */
572     			save_callback=task->tk_callback;
573     			task->tk_callback=NULL;
574     			save_callback(task);
575     		}
576     
577     		/*
578     		 * Perform the next FSM step.
579     		 * tk_action may be NULL when the task has been killed
580     		 * by someone else.
581     		 */
582     		if (RPC_IS_RUNNING(task)) {
583     			/*
584     			 * Garbage collection of pending timers...
585     			 */
586     			rpc_delete_timer(task);
587     			if (!task->tk_action)
588     				break;
589     			task->tk_action(task);
590     		}
591     
592     		/*
593     		 * Check whether task is sleeping.
594     		 */
595     		spin_lock_bh(&rpc_queue_lock);
596     		if (!RPC_IS_RUNNING(task)) {
597     			rpc_set_sleeping(task);
598     			if (RPC_IS_ASYNC(task)) {
599     				spin_unlock_bh(&rpc_queue_lock);
600     				return 0;
601     			}
602     		}
603     		spin_unlock_bh(&rpc_queue_lock);
604     
605     		while (RPC_IS_SLEEPING(task)) {
606     			/* sync task: sleep here */
607     			dprintk("RPC: %4d sync task going to sleep\n",
608     							task->tk_pid);
609     			if (current->pid == rpciod_pid)
610     				printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
611     
612     			__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
613     			dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
614     
615     			/*
616     			 * When a sync task receives a signal, it exits with
617     			 * -ERESTARTSYS. In order to catch any callbacks that
618     			 * clean up after sleeping on some queue, we don't
619     			 * break the loop here, but go around once more.
620     			 */
621     			if (task->tk_client->cl_intr && signalled()) {
622     				dprintk("RPC: %4d got signal\n", task->tk_pid);
623     				task->tk_flags |= RPC_TASK_KILLED;
624     				rpc_exit(task, -ERESTARTSYS);
625     				rpc_wake_up_task(task);
626     			}
627     		}
628     	}
629     
630     	if (task->tk_exit) {
631     		task->tk_exit(task);
632     		/* If tk_action is non-null, the user wants us to restart */
633     		if (task->tk_action) {
634     			if (!RPC_ASSASSINATED(task)) {
635     				/* Release RPC slot and buffer memory */
636     				if (task->tk_rqstp)
637     					xprt_release(task);
638     				if (task->tk_buffer) {
639     					rpc_free(task->tk_buffer);
640     					task->tk_buffer = NULL;
641     				}
642     				goto restarted;
643     			}
644     			printk(KERN_ERR "RPC: dead task tries to walk away.\n");
645     		}
646     	}
647     
648     	dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
649     	status = task->tk_status;
650     
651     	/* Release all resources associated with the task */
652     	rpc_release_task(task);
653     
654     	return status;
655     }
656     
657     /*
658      * User-visible entry point to the scheduler.
659      *
660      * This may be called recursively if e.g. an async NFS task updates
661      * the attributes and finds that dirty pages must be flushed.
662      * NOTE: Upon exit of this function the task is guaranteed to be
663      *	 released. In particular note that tk_release() will have
664      *	 been called, so your task memory may have been freed.
665      */
666     int
667     rpc_execute(struct rpc_task *task)
668     {
669     	int status = -EIO;
670     	if (rpc_inhibit) {
671     		printk(KERN_INFO "RPC: execution inhibited!\n");
672     		goto out_release;
673     	}
674     
675     	status = -EWOULDBLOCK;
676     	if (task->tk_active) {
677     		printk(KERN_ERR "RPC: active task was run twice!\n");
678     		goto out_err;
679     	}
680     
681     	task->tk_active = 1;
682     	rpc_set_running(task);
683     	return __rpc_execute(task);
684      out_release:
685     	rpc_release_task(task);
686      out_err:
687     	return status;
688     }
689     
690     /*
691      * This is our own little scheduler for async RPC tasks.
692      */
693     static void
694     __rpc_schedule(void)
695     {
696     	struct rpc_task	*task;
697     	int		count = 0;
698     
699     	dprintk("RPC:      rpc_schedule enter\n");
700     	while (1) {
701     		/* Ensure equal rights for tcp tasks... */
702     		rpciod_tcp_dispatcher();
703     
704     		spin_lock_bh(&rpc_queue_lock);
705     		if (!(task = schedq.task)) {
706     			spin_unlock_bh(&rpc_queue_lock);
707     			break;
708     		}
709     		if (task->tk_lock) {
710     			spin_unlock_bh(&rpc_queue_lock);
711     			printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
712     #ifdef RPC_DEBUG			
713     			rpc_debug = ~0;
714     			rpc_show_tasks();
715     #endif			
716     			break;
717     		}
718     		__rpc_remove_wait_queue(task);
719     		spin_unlock_bh(&rpc_queue_lock);
720     
721     		__rpc_execute(task);
722     
723     		if (++count >= 200 || current->need_resched) {
724     			count = 0;
725     			schedule();
726     		}
727     	}
728     	dprintk("RPC:      rpc_schedule leave\n");
729     }
730     
731     /*
732      * Allocate memory for RPC purpose.
733      *
734      * This is yet another tricky issue: For sync requests issued by
735      * a user process, we want to make kmalloc sleep if there isn't
736      * enough memory. Async requests should not sleep too excessively
737      * because that will block rpciod (but that's not dramatic when
738      * it's starved of memory anyway). Finally, swapout requests should
739      * never sleep at all, and should not trigger another swap_out
740      * request through kmalloc which would just increase memory contention.
741      *
742      * I hope the following gets it right, which gives async requests
743      * a slight advantage over sync requests (good for writeback, debatable
744      * for readahead):
745      *
746      *   sync user requests:	GFP_KERNEL
747      *   async requests:		GFP_RPC		(== GFP_NFS)
748      *   swap requests:		GFP_ATOMIC	(or new GFP_SWAPPER)
749      */
750     void *
751     rpc_allocate(unsigned int flags, unsigned int size)
752     {
753     	u32	*buffer;
754     	int	gfp;
755     
756     	if (flags & RPC_TASK_SWAPPER)
757     		gfp = GFP_ATOMIC;
758     	else if (flags & RPC_TASK_ASYNC)
759     		gfp = GFP_RPC;
760     	else
761     		gfp = GFP_KERNEL;
762     
763     	do {
764     		if ((buffer = (u32 *) kmalloc(size, gfp)) != NULL) {
765     			dprintk("RPC:      allocated buffer %p\n", buffer);
766     			return buffer;
767     		}
768     		if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
769     		    && rpc_lock_swapbuf()) {
770     			dprintk("RPC:      used last-ditch swap buffer\n");
771     			return swap_buffer;
772     		}
773     		if (flags & RPC_TASK_ASYNC)
774     			return NULL;
775     		set_current_state(TASK_INTERRUPTIBLE);
776     		schedule_timeout(HZ>>4);
777     	} while (!signalled());
778     
779     	return NULL;
780     }
781     
782     void
783     rpc_free(void *buffer)
784     {
785     	if (buffer != swap_buffer) {
786     		kfree(buffer);
787     		return;
788     	}
789     	rpc_unlock_swapbuf();
790     }
791     
792     /*
793      * Creation and deletion of RPC task structures
794      */
795     inline void
796     rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
797     				rpc_action callback, int flags)
798     {
799     	memset(task, 0, sizeof(*task));
800     	init_timer(&task->tk_timer);
801     	task->tk_timer.data     = (unsigned long) task;
802     	task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
803     	task->tk_client = clnt;
804     	task->tk_flags  = flags;
805     	task->tk_exit   = callback;
806     	init_waitqueue_head(&task->tk_wait);
807     	if (current->uid != current->fsuid || current->gid != current->fsgid)
808     		task->tk_flags |= RPC_TASK_SETUID;
809     
810     	/* Initialize retry counters */
811     	task->tk_garb_retry = 2;
812     	task->tk_cred_retry = 2;
813     	task->tk_suid_retry = 1;
814     
815     	/* Add to global list of all tasks */
816     	spin_lock(&rpc_sched_lock);
817     	task->tk_next_task = all_tasks;
818     	task->tk_prev_task = NULL;
819     	if (all_tasks)
820     		all_tasks->tk_prev_task = task;
821     	all_tasks = task;
822     	spin_unlock(&rpc_sched_lock);
823     
824     	if (clnt)
825     		atomic_inc(&clnt->cl_users);
826     
827     #ifdef RPC_DEBUG
828     	task->tk_magic = 0xf00baa;
829     	task->tk_pid = rpc_task_id++;
830     #endif
831     	dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
832     				current->pid);
833     }
834     
835     static void
836     rpc_default_free_task(struct rpc_task *task)
837     {
838     	dprintk("RPC: %4d freeing task\n", task->tk_pid);
839     	rpc_free(task);
840     }
841     
842     /*
843      * Create a new task for the specified client.  We have to
844      * clean up after an allocation failure, as the client may
845      * have specified "oneshot".
846      */
847     struct rpc_task *
848     rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
849     {
850     	struct rpc_task	*task;
851     
852     	task = (struct rpc_task *) rpc_allocate(flags, sizeof(*task));
853     	if (!task)
854     		goto cleanup;
855     
856     	rpc_init_task(task, clnt, callback, flags);
857     
858     	/* Replace tk_release */
859     	task->tk_release = rpc_default_free_task;
860     
861     	dprintk("RPC: %4d allocated task\n", task->tk_pid);
862     	task->tk_flags |= RPC_TASK_DYNAMIC;
863     out:
864     	return task;
865     
866     cleanup:
867     	/* Check whether to release the client */
868     	if (clnt) {
869     		printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
870     			atomic_read(&clnt->cl_users), clnt->cl_oneshot);
871     		atomic_inc(&clnt->cl_users); /* pretend we were used ... */
872     		rpc_release_client(clnt);
873     	}
874     	goto out;
875     }
876     
877     void
878     rpc_release_task(struct rpc_task *task)
879     {
880     	struct rpc_task	*next, *prev;
881     
882     	dprintk("RPC: %4d release task\n", task->tk_pid);
883     
884     #ifdef RPC_DEBUG
885     	if (task->tk_magic != 0xf00baa) {
886     		printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
887     		rpc_debug = ~0;
888     		rpc_show_tasks();
889     		return;
890     	}
891     #endif
892     
893     	/* Remove from global task list */
894     	spin_lock(&rpc_sched_lock);
895     	prev = task->tk_prev_task;
896     	next = task->tk_next_task;
897     	if (next)
898     		next->tk_prev_task = prev;
899     	if (prev)
900     		prev->tk_next_task = next;
901     	else
902     		all_tasks = next;
903     	task->tk_next_task = task->tk_prev_task = NULL;
904     	spin_unlock(&rpc_sched_lock);
905     
906     	/* Protect the execution below. */
907     	spin_lock_bh(&rpc_queue_lock);
908     
909     	/* Disable timer to prevent zombie wakeup */
910     	__rpc_disable_timer(task);
911     
912     	/* Remove from any wait queue we're still on */
913     	__rpc_remove_wait_queue(task);
914     
915     	task->tk_active = 0;
916     
917     	spin_unlock_bh(&rpc_queue_lock);
918     
919     	/* Synchronously delete any running timer */
920     	rpc_delete_timer(task);
921     
922     	/* Release resources */
923     	if (task->tk_rqstp)
924     		xprt_release(task);
925     	if (task->tk_msg.rpc_cred)
926     		rpcauth_unbindcred(task);
927     	if (task->tk_buffer) {
928     		rpc_free(task->tk_buffer);
929     		task->tk_buffer = NULL;
930     	}
931     	if (task->tk_client) {
932     		rpc_release_client(task->tk_client);
933     		task->tk_client = NULL;
934     	}
935     
936     #ifdef RPC_DEBUG
937     	task->tk_magic = 0;
938     #endif
939     	if (task->tk_release)
940     		task->tk_release(task);
941     }
942     
943     /**
944      * rpc_find_parent - find the parent of a child task.
945      * @child: child task
946      *
947      * Checks that the parent task is still sleeping on the
948      * queue 'childq'. If so returns a pointer to the parent.
949      * Upon failure returns NULL.
950      *
951      * Caller must hold rpc_queue_lock
952      */
953     static inline struct rpc_task *
954     rpc_find_parent(struct rpc_task *child)
955     {
956     	struct rpc_task	*task, *parent;
957     
958     	parent = (struct rpc_task *) child->tk_calldata;
959     	if ((task = childq.task) != NULL) {
960     		do {
961     			if (task == parent)
962     				return parent;
963     		} while ((task = task->tk_next) != childq.task);
964     	}
965     	return NULL;
966     }
967     
968     static void
969     rpc_child_exit(struct rpc_task *child)
970     {
971     	struct rpc_task	*parent;
972     
973     	spin_lock_bh(&rpc_queue_lock);
974     	if ((parent = rpc_find_parent(child)) != NULL) {
975     		parent->tk_status = child->tk_status;
976     		__rpc_wake_up_task(parent);
977     	}
978     	spin_unlock_bh(&rpc_queue_lock);
979     }
980     
981     /*
982      * Note: rpc_new_task releases the client after a failure.
983      */
984     struct rpc_task *
985     rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
986     {
987     	struct rpc_task	*task;
988     
989     	task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
990     	if (!task)
991     		goto fail;
992     	task->tk_exit = rpc_child_exit;
993     	task->tk_calldata = parent;
994     	return task;
995     
996     fail:
997     	parent->tk_status = -ENOMEM;
998     	return NULL;
999     }
1000     
1001     void
1002     rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
1003     {
1004     	spin_lock_bh(&rpc_queue_lock);
1005     	/* N.B. Is it possible for the child to have already finished? */
1006     	__rpc_sleep_on(&childq, task, func, NULL);
1007     	rpc_schedule_run(child);
1008     	spin_unlock_bh(&rpc_queue_lock);
1009     }
1010     
1011     /*
1012      * Kill all tasks for the given client.
1013      * XXX: kill their descendants as well?
1014      */
1015     void
1016     rpc_killall_tasks(struct rpc_clnt *clnt)
1017     {
1018     	struct rpc_task	**q, *rovr;
1019     
1020     	dprintk("RPC:      killing all tasks for client %p\n", clnt);
1021     
1022     	/*
1023     	 * Spin lock all_tasks to prevent changes...
1024     	 */
1025     	spin_lock(&rpc_sched_lock);
1026     	for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
1027     		if (!clnt || rovr->tk_client == clnt) {
1028     			rovr->tk_flags |= RPC_TASK_KILLED;
1029     			rpc_exit(rovr, -EIO);
1030     			rpc_wake_up_task(rovr);
1031     		}
1032     	}
1033     	spin_unlock(&rpc_sched_lock);
1034     }
1035     
1036     static DECLARE_MUTEX_LOCKED(rpciod_running);
1037     
1038     static inline int
1039     rpciod_task_pending(void)
1040     {
1041     	return schedq.task != NULL || xprt_tcp_pending();
1042     }
1043     
1044     
1045     /*
1046      * This is the rpciod kernel thread
1047      */
1048     static int
1049     rpciod(void *ptr)
1050     {
1051     	wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
1052     	int		rounds = 0;
1053     
1054     	MOD_INC_USE_COUNT;
1055     	lock_kernel();
1056     	/*
1057     	 * Let our maker know we're running ...
1058     	 */
1059     	rpciod_pid = current->pid;
1060     	up(&rpciod_running);
1061     
1062     	daemonize();
1063     
1064     	spin_lock_irq(&current->sigmask_lock);
1065     	siginitsetinv(&current->blocked, sigmask(SIGKILL));
1066     	recalc_sigpending(current);
1067     	spin_unlock_irq(&current->sigmask_lock);
1068     
1069     	strcpy(current->comm, "rpciod");
1070     
1071     	current->flags |= PF_MEMALLOC;
1072     
1073     	dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
1074     	while (rpciod_users) {
1075     		if (signalled()) {
1076     			rpciod_killall();
1077     			flush_signals(current);
1078     		}
1079     		__rpc_schedule();
1080     
1081     		if (++rounds >= 64) {	/* safeguard */
1082     			schedule();
1083     			rounds = 0;
1084     		}
1085     
1086     		if (!rpciod_task_pending()) {
1087     			dprintk("RPC: rpciod back to sleep\n");
1088     			wait_event_interruptible(rpciod_idle, rpciod_task_pending());
1089     			dprintk("RPC: switch to rpciod\n");
1090     			rounds = 0;
1091     		}
1092     	}
1093     
1094     	dprintk("RPC: rpciod shutdown commences\n");
1095     	if (all_tasks) {
1096     		printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
1097     		rpciod_killall();
1098     	}
1099     
1100     	rpciod_pid = 0;
1101     	wake_up(assassin);
1102     
1103     	dprintk("RPC: rpciod exiting\n");
1104     	MOD_DEC_USE_COUNT;
1105     	return 0;
1106     }
1107     
1108     static void
1109     rpciod_killall(void)
1110     {
1111     	unsigned long flags;
1112     
1113     	while (all_tasks) {
1114     		current->sigpending = 0;
1115     		rpc_killall_tasks(NULL);
1116     		__rpc_schedule();
1117     		if (all_tasks) {
1118     			dprintk("rpciod_killall: waiting for tasks to exit\n");
1119     			set_current_state(TASK_INTERRUPTIBLE);
1120     			schedule_timeout(1);
1121     		}
1122     	}
1123     
1124     	spin_lock_irqsave(&current->sigmask_lock, flags);
1125     	recalc_sigpending(current);
1126     	spin_unlock_irqrestore(&current->sigmask_lock, flags);
1127     }
1128     
1129     /*
1130      * Start up the rpciod process if it's not already running.
1131      */
1132     int
1133     rpciod_up(void)
1134     {
1135     	int error = 0;
1136     
1137     	MOD_INC_USE_COUNT;
1138     	down(&rpciod_sema);
1139     	dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
1140     	rpciod_users++;
1141     	if (rpciod_pid)
1142     		goto out;
1143     	/*
1144     	 * If there's no pid, we should be the first user.
1145     	 */
1146     	if (rpciod_users > 1)
1147     		printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
1148     	/*
1149     	 * Create the rpciod thread and wait for it to start.
1150     	 */
1151     	error = kernel_thread(rpciod, &rpciod_killer, 0);
1152     	if (error < 0) {
1153     		printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
1154     		rpciod_users--;
1155     		goto out;
1156     	}
1157     	down(&rpciod_running);
1158     	error = 0;
1159     out:
1160     	up(&rpciod_sema);
1161     	MOD_DEC_USE_COUNT;
1162     	return error;
1163     }
1164     
1165     void
1166     rpciod_down(void)
1167     {
1168     	unsigned long flags;
1169     
1170     	MOD_INC_USE_COUNT;
1171     	down(&rpciod_sema);
1172     	dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
1173     	if (rpciod_users) {
1174     		if (--rpciod_users)
1175     			goto out;
1176     	} else
1177     		printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
1178     
1179     	if (!rpciod_pid) {
1180     		dprintk("rpciod_down: Nothing to do!\n");
1181     		goto out;
1182     	}
1183     
1184     	kill_proc(rpciod_pid, SIGKILL, 1);
1185     	/*
1186     	 * Usually rpciod will exit very quickly, so we
1187     	 * wait briefly before checking the process id.
1188     	 */
1189     	current->sigpending = 0;
1190     	set_current_state(TASK_INTERRUPTIBLE);
1191     	schedule_timeout(1);
1192     	/*
1193     	 * Display a message if we're going to wait longer.
1194     	 */
1195     	while (rpciod_pid) {
1196     		dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid);
1197     		if (signalled()) {
1198     			dprintk("rpciod_down: caught signal\n");
1199     			break;
1200     		}
1201     		interruptible_sleep_on(&rpciod_killer);
1202     	}
1203     	spin_lock_irqsave(&current->sigmask_lock, flags);
1204     	recalc_sigpending(current);
1205     	spin_unlock_irqrestore(&current->sigmask_lock, flags);
1206     out:
1207     	up(&rpciod_sema);
1208     	MOD_DEC_USE_COUNT;
1209     }
1210     
1211     #ifdef RPC_DEBUG
1212     void rpc_show_tasks(void)
1213     {
1214     	struct rpc_task *t = all_tasks, *next;
1215     
1216     	spin_lock(&rpc_sched_lock);
1217     	t = all_tasks;
1218     	if (!t) {
1219     		spin_unlock(&rpc_sched_lock);
1220     		return;
1221     	}
1222     	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1223     		"-rpcwait -action- --exit--\n");
1224     	for (; t; t = next) {
1225     		next = t->tk_next_task;
1226     		printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1227     			t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
1228     			t->tk_client, t->tk_client->cl_prog,
1229     			t->tk_rqstp, t->tk_timeout,
1230     			t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
1231     			t->tk_action, t->tk_exit);
1232     	}
1233     	spin_unlock(&rpc_sched_lock);
1234     }
1235     #endif
1236