File: /usr/src/linux/net/sunrpc/sched.c
1 /*
2 * linux/net/sunrpc/sched.c
3 *
4 * Scheduling for synchronous and asynchronous RPC requests.
5 *
6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7 *
8 * TCP NFS related read + write fixes
9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10 */
11
12 #include <linux/module.h>
13
14 #define __KERNEL_SYSCALLS__
15 #include <linux/sched.h>
16 #include <linux/interrupt.h>
17 #include <linux/slab.h>
18 #include <linux/unistd.h>
19 #include <linux/smp.h>
20 #include <linux/smp_lock.h>
21 #include <linux/spinlock.h>
22
23 #include <linux/sunrpc/clnt.h>
24
25 #ifdef RPC_DEBUG
26 #define RPCDBG_FACILITY RPCDBG_SCHED
27 static int rpc_task_id;
28 #endif
29
30 /*
31 * We give RPC the same get_free_pages priority as NFS
32 */
33 #define GFP_RPC GFP_NFS
34
35 static void __rpc_default_timer(struct rpc_task *task);
36 static void rpciod_killall(void);
37
38 /*
39 * When an asynchronous RPC task is activated within a bottom half
40 * handler, or while executing another RPC task, it is put on
41 * schedq, and rpciod is woken up.
42 */
43 static struct rpc_wait_queue schedq = RPC_INIT_WAITQ("schedq");
44
45 /*
46 * RPC tasks that create another task (e.g. for contacting the portmapper)
47 * will wait on this queue for their child's completion
48 */
49 static struct rpc_wait_queue childq = RPC_INIT_WAITQ("childq");
50
51 /*
52 * RPC tasks sit here while waiting for conditions to improve.
53 */
54 static struct rpc_wait_queue delay_queue = RPC_INIT_WAITQ("delayq");
55
56 /*
57 * All RPC tasks are linked into this list
58 */
59 static struct rpc_task * all_tasks;
60
61 /*
62 * rpciod-related stuff
63 */
64 static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
65 static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer);
66 static DECLARE_MUTEX(rpciod_sema);
67 static unsigned int rpciod_users;
68 static pid_t rpciod_pid;
69 static int rpc_inhibit;
70
71 /*
72 * Spinlock for wait queues. Access to the latter also has to be
73 * interrupt-safe in order to allow timers to wake up sleeping tasks.
74 */
75 spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
76 /*
77 * Spinlock for other critical sections of code.
78 */
79 static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
80
81 /*
82 * This is the last-ditch buffer for NFS swap requests
83 */
84 static u32 swap_buffer[PAGE_SIZE >> 2];
85 static long swap_buffer_used;
86
87 /*
88 * Make allocation of the swap_buffer SMP-safe
89 */
90 static __inline__ int rpc_lock_swapbuf(void)
91 {
92 return !test_and_set_bit(1, &swap_buffer_used);
93 }
94 static __inline__ void rpc_unlock_swapbuf(void)
95 {
96 clear_bit(1, &swap_buffer_used);
97 }
98
99 /*
100 * Disable the timer for a given RPC task. Should be called with
101 * rpc_queue_lock and bh_disabled in order to avoid races within
102 * rpc_run_timer().
103 */
104 static inline void
105 __rpc_disable_timer(struct rpc_task *task)
106 {
107 dprintk("RPC: %4d disabling timer\n", task->tk_pid);
108 task->tk_timeout_fn = NULL;
109 task->tk_timeout = 0;
110 }
111
112 /*
113 * Run a timeout function.
114 * We use the callback in order to allow __rpc_wake_up_task()
115 * and friends to disable the timer synchronously on SMP systems
116 * without calling del_timer_sync(). The latter could cause a
117 * deadlock if called while we're holding spinlocks...
118 */
119 static void
120 rpc_run_timer(struct rpc_task *task)
121 {
122 void (*callback)(struct rpc_task *);
123
124 spin_lock_bh(&rpc_queue_lock);
125 callback = task->tk_timeout_fn;
126 task->tk_timeout_fn = NULL;
127 spin_unlock_bh(&rpc_queue_lock);
128 if (callback) {
129 dprintk("RPC: %4d running timer\n", task->tk_pid);
130 callback(task);
131 }
132 }
133
134 /*
135 * Set up a timer for the current task.
136 */
137 static inline void
138 __rpc_add_timer(struct rpc_task *task, rpc_action timer)
139 {
140 if (!task->tk_timeout)
141 return;
142
143 dprintk("RPC: %4d setting alarm for %lu ms\n",
144 task->tk_pid, task->tk_timeout * 1000 / HZ);
145
146 if (timer)
147 task->tk_timeout_fn = timer;
148 else
149 task->tk_timeout_fn = __rpc_default_timer;
150 mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
151 }
152
153 /*
154 * Set up a timer for an already sleeping task.
155 */
156 void rpc_add_timer(struct rpc_task *task, rpc_action timer)
157 {
158 spin_lock_bh(&rpc_queue_lock);
159 if (!(RPC_IS_RUNNING(task) || task->tk_wakeup))
160 __rpc_add_timer(task, timer);
161 spin_unlock_bh(&rpc_queue_lock);
162 }
163
164 /*
165 * Delete any timer for the current task. Because we use del_timer_sync(),
166 * this function should never be called while holding rpc_queue_lock.
167 */
168 static inline void
169 rpc_delete_timer(struct rpc_task *task)
170 {
171 if (timer_pending(&task->tk_timer)) {
172 dprintk("RPC: %4d deleting timer\n", task->tk_pid);
173 del_timer_sync(&task->tk_timer);
174 }
175 }
176
177 /*
178 * Add new request to wait queue.
179 *
180 * Swapper tasks always get inserted at the head of the queue.
181 * This should avoid many nasty memory deadlocks and hopefully
182 * improve overall performance.
183 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
184 */
185 static inline int
186 __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
187 {
188 if (task->tk_rpcwait == queue)
189 return 0;
190
191 if (task->tk_rpcwait) {
192 printk(KERN_WARNING "RPC: doubly enqueued task!\n");
193 return -EWOULDBLOCK;
194 }
195 if (RPC_IS_SWAPPER(task))
196 rpc_insert_list(&queue->task, task);
197 else
198 rpc_append_list(&queue->task, task);
199 task->tk_rpcwait = queue;
200
201 dprintk("RPC: %4d added to queue %p \"%s\"\n",
202 task->tk_pid, queue, rpc_qname(queue));
203
204 return 0;
205 }
206
207 int
208 rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
209 {
210 int result;
211
212 spin_lock_bh(&rpc_queue_lock);
213 result = __rpc_add_wait_queue(q, task);
214 spin_unlock_bh(&rpc_queue_lock);
215 return result;
216 }
217
218 /*
219 * Remove request from queue.
220 * Note: must be called with spin lock held.
221 */
222 static inline void
223 __rpc_remove_wait_queue(struct rpc_task *task)
224 {
225 struct rpc_wait_queue *queue = task->tk_rpcwait;
226
227 if (!queue)
228 return;
229
230 rpc_remove_list(&queue->task, task);
231 task->tk_rpcwait = NULL;
232
233 dprintk("RPC: %4d removed from queue %p \"%s\"\n",
234 task->tk_pid, queue, rpc_qname(queue));
235 }
236
237 void
238 rpc_remove_wait_queue(struct rpc_task *task)
239 {
240 if (!task->tk_rpcwait)
241 return;
242 spin_lock_bh(&rpc_queue_lock);
243 __rpc_remove_wait_queue(task);
244 spin_unlock_bh(&rpc_queue_lock);
245 }
246
247 /*
248 * Make an RPC task runnable.
249 *
250 * Note: If the task is ASYNC, this must be called with
251 * the spinlock held to protect the wait queue operation.
252 */
253 static inline void
254 rpc_make_runnable(struct rpc_task *task)
255 {
256 if (task->tk_timeout_fn) {
257 printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
258 return;
259 }
260 rpc_set_running(task);
261 if (RPC_IS_ASYNC(task)) {
262 if (RPC_IS_SLEEPING(task)) {
263 int status;
264 status = __rpc_add_wait_queue(&schedq, task);
265 if (status < 0) {
266 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
267 task->tk_status = status;
268 return;
269 }
270 rpc_clear_sleeping(task);
271 if (waitqueue_active(&rpciod_idle))
272 wake_up(&rpciod_idle);
273 }
274 } else {
275 rpc_clear_sleeping(task);
276 if (waitqueue_active(&task->tk_wait))
277 wake_up(&task->tk_wait);
278 }
279 }
280
281 /*
282 * Place a newly initialized task on the schedq.
283 */
284 static inline void
285 rpc_schedule_run(struct rpc_task *task)
286 {
287 /* Don't run a child twice! */
288 if (RPC_IS_ACTIVATED(task))
289 return;
290 task->tk_active = 1;
291 rpc_set_sleeping(task);
292 rpc_make_runnable(task);
293 }
294
295 /*
296 * For other people who may need to wake the I/O daemon
297 * but should (for now) know nothing about its innards
298 */
299 void rpciod_wake_up(void)
300 {
301 if(rpciod_pid==0)
302 printk(KERN_ERR "rpciod: wot no daemon?\n");
303 if (waitqueue_active(&rpciod_idle))
304 wake_up(&rpciod_idle);
305 }
306
307 /*
308 * Prepare for sleeping on a wait queue.
309 * By always appending tasks to the list we ensure FIFO behavior.
310 * NB: An RPC task will only receive interrupt-driven events as long
311 * as it's on a wait queue.
312 */
313 static void
314 __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
315 rpc_action action, rpc_action timer)
316 {
317 int status;
318
319 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
320 rpc_qname(q), jiffies);
321
322 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
323 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
324 return;
325 }
326
327 /* Mark the task as being activated if so needed */
328 if (!RPC_IS_ACTIVATED(task)) {
329 task->tk_active = 1;
330 rpc_set_sleeping(task);
331 }
332
333 status = __rpc_add_wait_queue(q, task);
334 if (status) {
335 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
336 task->tk_status = status;
337 } else {
338 rpc_clear_running(task);
339 if (task->tk_callback) {
340 dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
341 BUG();
342 }
343 task->tk_callback = action;
344 __rpc_add_timer(task, timer);
345 }
346 }
347
348 void
349 rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
350 rpc_action action, rpc_action timer)
351 {
352 /*
353 * Protect the queue operations.
354 */
355 spin_lock_bh(&rpc_queue_lock);
356 __rpc_sleep_on(q, task, action, timer);
357 spin_unlock_bh(&rpc_queue_lock);
358 }
359
360 void
361 rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task,
362 rpc_action action, rpc_action timer)
363 {
364 /*
365 * Protect the queue operations.
366 */
367 spin_lock_bh(&rpc_queue_lock);
368 __rpc_sleep_on(q, task, action, timer);
369 __rpc_lock_task(task);
370 spin_unlock_bh(&rpc_queue_lock);
371 }
372
373 /**
374 * __rpc_wake_up_task - wake up a single rpc_task
375 * @task: task to be woken up
376 *
377 * If the task is locked, it is merely removed from the queue, and
378 * 'task->tk_wakeup' is set. rpc_unlock_task() will then ensure
379 * that it is woken up as soon as the lock count goes to zero.
380 *
381 * Caller must hold rpc_queue_lock
382 */
383 static void
384 __rpc_wake_up_task(struct rpc_task *task)
385 {
386 dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
387 task->tk_pid, jiffies, rpc_inhibit);
388
389 #ifdef RPC_DEBUG
390 if (task->tk_magic != 0xf00baa) {
391 printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
392 rpc_debug = ~0;
393 rpc_show_tasks();
394 return;
395 }
396 #endif
397 /* Has the task been executed yet? If not, we cannot wake it up! */
398 if (!RPC_IS_ACTIVATED(task)) {
399 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
400 return;
401 }
402 if (RPC_IS_RUNNING(task))
403 return;
404
405 __rpc_disable_timer(task);
406 if (task->tk_rpcwait != &schedq)
407 __rpc_remove_wait_queue(task);
408
409 /* If the task has been locked, then set tk_wakeup so that
410 * rpc_unlock_task() wakes us up... */
411 if (task->tk_lock) {
412 task->tk_wakeup = 1;
413 return;
414 } else
415 task->tk_wakeup = 0;
416
417 rpc_make_runnable(task);
418
419 dprintk("RPC: __rpc_wake_up_task done\n");
420 }
421
422 /*
423 * Default timeout handler if none specified by user
424 */
425 static void
426 __rpc_default_timer(struct rpc_task *task)
427 {
428 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
429 task->tk_status = -ETIMEDOUT;
430 rpc_wake_up_task(task);
431 }
432
433 /*
434 * Wake up the specified task
435 */
436 void
437 rpc_wake_up_task(struct rpc_task *task)
438 {
439 if (RPC_IS_RUNNING(task))
440 return;
441 spin_lock_bh(&rpc_queue_lock);
442 __rpc_wake_up_task(task);
443 spin_unlock_bh(&rpc_queue_lock);
444 }
445
446 /*
447 * Wake up the next task on the wait queue.
448 */
449 struct rpc_task *
450 rpc_wake_up_next(struct rpc_wait_queue *queue)
451 {
452 struct rpc_task *task;
453
454 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
455 spin_lock_bh(&rpc_queue_lock);
456 if ((task = queue->task) != 0)
457 __rpc_wake_up_task(task);
458 spin_unlock_bh(&rpc_queue_lock);
459
460 return task;
461 }
462
463 /**
464 * rpc_wake_up - wake up all rpc_tasks
465 * @queue: rpc_wait_queue on which the tasks are sleeping
466 *
467 * Grabs rpc_queue_lock
468 */
469 void
470 rpc_wake_up(struct rpc_wait_queue *queue)
471 {
472 spin_lock_bh(&rpc_queue_lock);
473 while (queue->task)
474 __rpc_wake_up_task(queue->task);
475 spin_unlock_bh(&rpc_queue_lock);
476 }
477
478 /**
479 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
480 * @queue: rpc_wait_queue on which the tasks are sleeping
481 * @status: status value to set
482 *
483 * Grabs rpc_queue_lock
484 */
485 void
486 rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
487 {
488 struct rpc_task *task;
489
490 spin_lock_bh(&rpc_queue_lock);
491 while ((task = queue->task) != NULL) {
492 task->tk_status = status;
493 __rpc_wake_up_task(task);
494 }
495 spin_unlock_bh(&rpc_queue_lock);
496 }
497
498 /*
499 * Lock down a sleeping task to prevent it from waking up
500 * and disappearing from beneath us.
501 *
502 * This function should always be called with the
503 * rpc_queue_lock held.
504 */
505 int
506 __rpc_lock_task(struct rpc_task *task)
507 {
508 if (!RPC_IS_RUNNING(task))
509 return ++task->tk_lock;
510 return 0;
511 }
512
513 void
514 rpc_unlock_task(struct rpc_task *task)
515 {
516 spin_lock_bh(&rpc_queue_lock);
517 if (task->tk_lock && !--task->tk_lock && task->tk_wakeup)
518 __rpc_wake_up_task(task);
519 spin_unlock_bh(&rpc_queue_lock);
520 }
521
522 /*
523 * Run a task at a later time
524 */
525 static void __rpc_atrun(struct rpc_task *);
526 void
527 rpc_delay(struct rpc_task *task, unsigned long delay)
528 {
529 task->tk_timeout = delay;
530 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
531 }
532
533 static void
534 __rpc_atrun(struct rpc_task *task)
535 {
536 task->tk_status = 0;
537 rpc_wake_up_task(task);
538 }
539
540 /*
541 * This is the RPC `scheduler' (or rather, the finite state machine).
542 */
543 static int
544 __rpc_execute(struct rpc_task *task)
545 {
546 int status = 0;
547
548 dprintk("RPC: %4d rpc_execute flgs %x\n",
549 task->tk_pid, task->tk_flags);
550
551 if (!RPC_IS_RUNNING(task)) {
552 printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
553 return 0;
554 }
555
556 restarted:
557 while (1) {
558 /*
559 * Execute any pending callback.
560 */
561 if (RPC_DO_CALLBACK(task)) {
562 /* Define a callback save pointer */
563 void (*save_callback)(struct rpc_task *);
564
565 /*
566 * If a callback exists, save it, reset it,
567 * call it.
568 * The save is needed to stop from resetting
569 * another callback set within the callback handler
570 * - Dave
571 */
572 save_callback=task->tk_callback;
573 task->tk_callback=NULL;
574 save_callback(task);
575 }
576
577 /*
578 * Perform the next FSM step.
579 * tk_action may be NULL when the task has been killed
580 * by someone else.
581 */
582 if (RPC_IS_RUNNING(task)) {
583 /*
584 * Garbage collection of pending timers...
585 */
586 rpc_delete_timer(task);
587 if (!task->tk_action)
588 break;
589 task->tk_action(task);
590 }
591
592 /*
593 * Check whether task is sleeping.
594 */
595 spin_lock_bh(&rpc_queue_lock);
596 if (!RPC_IS_RUNNING(task)) {
597 rpc_set_sleeping(task);
598 if (RPC_IS_ASYNC(task)) {
599 spin_unlock_bh(&rpc_queue_lock);
600 return 0;
601 }
602 }
603 spin_unlock_bh(&rpc_queue_lock);
604
605 while (RPC_IS_SLEEPING(task)) {
606 /* sync task: sleep here */
607 dprintk("RPC: %4d sync task going to sleep\n",
608 task->tk_pid);
609 if (current->pid == rpciod_pid)
610 printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
611
612 __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
613 dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
614
615 /*
616 * When a sync task receives a signal, it exits with
617 * -ERESTARTSYS. In order to catch any callbacks that
618 * clean up after sleeping on some queue, we don't
619 * break the loop here, but go around once more.
620 */
621 if (task->tk_client->cl_intr && signalled()) {
622 dprintk("RPC: %4d got signal\n", task->tk_pid);
623 task->tk_flags |= RPC_TASK_KILLED;
624 rpc_exit(task, -ERESTARTSYS);
625 rpc_wake_up_task(task);
626 }
627 }
628 }
629
630 if (task->tk_exit) {
631 task->tk_exit(task);
632 /* If tk_action is non-null, the user wants us to restart */
633 if (task->tk_action) {
634 if (!RPC_ASSASSINATED(task)) {
635 /* Release RPC slot and buffer memory */
636 if (task->tk_rqstp)
637 xprt_release(task);
638 if (task->tk_buffer) {
639 rpc_free(task->tk_buffer);
640 task->tk_buffer = NULL;
641 }
642 goto restarted;
643 }
644 printk(KERN_ERR "RPC: dead task tries to walk away.\n");
645 }
646 }
647
648 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
649 status = task->tk_status;
650
651 /* Release all resources associated with the task */
652 rpc_release_task(task);
653
654 return status;
655 }
656
657 /*
658 * User-visible entry point to the scheduler.
659 *
660 * This may be called recursively if e.g. an async NFS task updates
661 * the attributes and finds that dirty pages must be flushed.
662 * NOTE: Upon exit of this function the task is guaranteed to be
663 * released. In particular note that tk_release() will have
664 * been called, so your task memory may have been freed.
665 */
666 int
667 rpc_execute(struct rpc_task *task)
668 {
669 int status = -EIO;
670 if (rpc_inhibit) {
671 printk(KERN_INFO "RPC: execution inhibited!\n");
672 goto out_release;
673 }
674
675 status = -EWOULDBLOCK;
676 if (task->tk_active) {
677 printk(KERN_ERR "RPC: active task was run twice!\n");
678 goto out_err;
679 }
680
681 task->tk_active = 1;
682 rpc_set_running(task);
683 return __rpc_execute(task);
684 out_release:
685 rpc_release_task(task);
686 out_err:
687 return status;
688 }
689
690 /*
691 * This is our own little scheduler for async RPC tasks.
692 */
693 static void
694 __rpc_schedule(void)
695 {
696 struct rpc_task *task;
697 int count = 0;
698
699 dprintk("RPC: rpc_schedule enter\n");
700 while (1) {
701 /* Ensure equal rights for tcp tasks... */
702 rpciod_tcp_dispatcher();
703
704 spin_lock_bh(&rpc_queue_lock);
705 if (!(task = schedq.task)) {
706 spin_unlock_bh(&rpc_queue_lock);
707 break;
708 }
709 if (task->tk_lock) {
710 spin_unlock_bh(&rpc_queue_lock);
711 printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
712 #ifdef RPC_DEBUG
713 rpc_debug = ~0;
714 rpc_show_tasks();
715 #endif
716 break;
717 }
718 __rpc_remove_wait_queue(task);
719 spin_unlock_bh(&rpc_queue_lock);
720
721 __rpc_execute(task);
722
723 if (++count >= 200 || current->need_resched) {
724 count = 0;
725 schedule();
726 }
727 }
728 dprintk("RPC: rpc_schedule leave\n");
729 }
730
731 /*
732 * Allocate memory for RPC purpose.
733 *
734 * This is yet another tricky issue: For sync requests issued by
735 * a user process, we want to make kmalloc sleep if there isn't
736 * enough memory. Async requests should not sleep too excessively
737 * because that will block rpciod (but that's not dramatic when
738 * it's starved of memory anyway). Finally, swapout requests should
739 * never sleep at all, and should not trigger another swap_out
740 * request through kmalloc which would just increase memory contention.
741 *
742 * I hope the following gets it right, which gives async requests
743 * a slight advantage over sync requests (good for writeback, debatable
744 * for readahead):
745 *
746 * sync user requests: GFP_KERNEL
747 * async requests: GFP_RPC (== GFP_NFS)
748 * swap requests: GFP_ATOMIC (or new GFP_SWAPPER)
749 */
750 void *
751 rpc_allocate(unsigned int flags, unsigned int size)
752 {
753 u32 *buffer;
754 int gfp;
755
756 if (flags & RPC_TASK_SWAPPER)
757 gfp = GFP_ATOMIC;
758 else if (flags & RPC_TASK_ASYNC)
759 gfp = GFP_RPC;
760 else
761 gfp = GFP_KERNEL;
762
763 do {
764 if ((buffer = (u32 *) kmalloc(size, gfp)) != NULL) {
765 dprintk("RPC: allocated buffer %p\n", buffer);
766 return buffer;
767 }
768 if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
769 && rpc_lock_swapbuf()) {
770 dprintk("RPC: used last-ditch swap buffer\n");
771 return swap_buffer;
772 }
773 if (flags & RPC_TASK_ASYNC)
774 return NULL;
775 set_current_state(TASK_INTERRUPTIBLE);
776 schedule_timeout(HZ>>4);
777 } while (!signalled());
778
779 return NULL;
780 }
781
782 void
783 rpc_free(void *buffer)
784 {
785 if (buffer != swap_buffer) {
786 kfree(buffer);
787 return;
788 }
789 rpc_unlock_swapbuf();
790 }
791
792 /*
793 * Creation and deletion of RPC task structures
794 */
795 inline void
796 rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
797 rpc_action callback, int flags)
798 {
799 memset(task, 0, sizeof(*task));
800 init_timer(&task->tk_timer);
801 task->tk_timer.data = (unsigned long) task;
802 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
803 task->tk_client = clnt;
804 task->tk_flags = flags;
805 task->tk_exit = callback;
806 init_waitqueue_head(&task->tk_wait);
807 if (current->uid != current->fsuid || current->gid != current->fsgid)
808 task->tk_flags |= RPC_TASK_SETUID;
809
810 /* Initialize retry counters */
811 task->tk_garb_retry = 2;
812 task->tk_cred_retry = 2;
813 task->tk_suid_retry = 1;
814
815 /* Add to global list of all tasks */
816 spin_lock(&rpc_sched_lock);
817 task->tk_next_task = all_tasks;
818 task->tk_prev_task = NULL;
819 if (all_tasks)
820 all_tasks->tk_prev_task = task;
821 all_tasks = task;
822 spin_unlock(&rpc_sched_lock);
823
824 if (clnt)
825 atomic_inc(&clnt->cl_users);
826
827 #ifdef RPC_DEBUG
828 task->tk_magic = 0xf00baa;
829 task->tk_pid = rpc_task_id++;
830 #endif
831 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
832 current->pid);
833 }
834
835 static void
836 rpc_default_free_task(struct rpc_task *task)
837 {
838 dprintk("RPC: %4d freeing task\n", task->tk_pid);
839 rpc_free(task);
840 }
841
842 /*
843 * Create a new task for the specified client. We have to
844 * clean up after an allocation failure, as the client may
845 * have specified "oneshot".
846 */
847 struct rpc_task *
848 rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
849 {
850 struct rpc_task *task;
851
852 task = (struct rpc_task *) rpc_allocate(flags, sizeof(*task));
853 if (!task)
854 goto cleanup;
855
856 rpc_init_task(task, clnt, callback, flags);
857
858 /* Replace tk_release */
859 task->tk_release = rpc_default_free_task;
860
861 dprintk("RPC: %4d allocated task\n", task->tk_pid);
862 task->tk_flags |= RPC_TASK_DYNAMIC;
863 out:
864 return task;
865
866 cleanup:
867 /* Check whether to release the client */
868 if (clnt) {
869 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
870 atomic_read(&clnt->cl_users), clnt->cl_oneshot);
871 atomic_inc(&clnt->cl_users); /* pretend we were used ... */
872 rpc_release_client(clnt);
873 }
874 goto out;
875 }
876
877 void
878 rpc_release_task(struct rpc_task *task)
879 {
880 struct rpc_task *next, *prev;
881
882 dprintk("RPC: %4d release task\n", task->tk_pid);
883
884 #ifdef RPC_DEBUG
885 if (task->tk_magic != 0xf00baa) {
886 printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
887 rpc_debug = ~0;
888 rpc_show_tasks();
889 return;
890 }
891 #endif
892
893 /* Remove from global task list */
894 spin_lock(&rpc_sched_lock);
895 prev = task->tk_prev_task;
896 next = task->tk_next_task;
897 if (next)
898 next->tk_prev_task = prev;
899 if (prev)
900 prev->tk_next_task = next;
901 else
902 all_tasks = next;
903 task->tk_next_task = task->tk_prev_task = NULL;
904 spin_unlock(&rpc_sched_lock);
905
906 /* Protect the execution below. */
907 spin_lock_bh(&rpc_queue_lock);
908
909 /* Disable timer to prevent zombie wakeup */
910 __rpc_disable_timer(task);
911
912 /* Remove from any wait queue we're still on */
913 __rpc_remove_wait_queue(task);
914
915 task->tk_active = 0;
916
917 spin_unlock_bh(&rpc_queue_lock);
918
919 /* Synchronously delete any running timer */
920 rpc_delete_timer(task);
921
922 /* Release resources */
923 if (task->tk_rqstp)
924 xprt_release(task);
925 if (task->tk_msg.rpc_cred)
926 rpcauth_unbindcred(task);
927 if (task->tk_buffer) {
928 rpc_free(task->tk_buffer);
929 task->tk_buffer = NULL;
930 }
931 if (task->tk_client) {
932 rpc_release_client(task->tk_client);
933 task->tk_client = NULL;
934 }
935
936 #ifdef RPC_DEBUG
937 task->tk_magic = 0;
938 #endif
939 if (task->tk_release)
940 task->tk_release(task);
941 }
942
943 /**
944 * rpc_find_parent - find the parent of a child task.
945 * @child: child task
946 *
947 * Checks that the parent task is still sleeping on the
948 * queue 'childq'. If so returns a pointer to the parent.
949 * Upon failure returns NULL.
950 *
951 * Caller must hold rpc_queue_lock
952 */
953 static inline struct rpc_task *
954 rpc_find_parent(struct rpc_task *child)
955 {
956 struct rpc_task *task, *parent;
957
958 parent = (struct rpc_task *) child->tk_calldata;
959 if ((task = childq.task) != NULL) {
960 do {
961 if (task == parent)
962 return parent;
963 } while ((task = task->tk_next) != childq.task);
964 }
965 return NULL;
966 }
967
968 static void
969 rpc_child_exit(struct rpc_task *child)
970 {
971 struct rpc_task *parent;
972
973 spin_lock_bh(&rpc_queue_lock);
974 if ((parent = rpc_find_parent(child)) != NULL) {
975 parent->tk_status = child->tk_status;
976 __rpc_wake_up_task(parent);
977 }
978 spin_unlock_bh(&rpc_queue_lock);
979 }
980
981 /*
982 * Note: rpc_new_task releases the client after a failure.
983 */
984 struct rpc_task *
985 rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
986 {
987 struct rpc_task *task;
988
989 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
990 if (!task)
991 goto fail;
992 task->tk_exit = rpc_child_exit;
993 task->tk_calldata = parent;
994 return task;
995
996 fail:
997 parent->tk_status = -ENOMEM;
998 return NULL;
999 }
1000
1001 void
1002 rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
1003 {
1004 spin_lock_bh(&rpc_queue_lock);
1005 /* N.B. Is it possible for the child to have already finished? */
1006 __rpc_sleep_on(&childq, task, func, NULL);
1007 rpc_schedule_run(child);
1008 spin_unlock_bh(&rpc_queue_lock);
1009 }
1010
1011 /*
1012 * Kill all tasks for the given client.
1013 * XXX: kill their descendants as well?
1014 */
1015 void
1016 rpc_killall_tasks(struct rpc_clnt *clnt)
1017 {
1018 struct rpc_task **q, *rovr;
1019
1020 dprintk("RPC: killing all tasks for client %p\n", clnt);
1021
1022 /*
1023 * Spin lock all_tasks to prevent changes...
1024 */
1025 spin_lock(&rpc_sched_lock);
1026 for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
1027 if (!clnt || rovr->tk_client == clnt) {
1028 rovr->tk_flags |= RPC_TASK_KILLED;
1029 rpc_exit(rovr, -EIO);
1030 rpc_wake_up_task(rovr);
1031 }
1032 }
1033 spin_unlock(&rpc_sched_lock);
1034 }
1035
1036 static DECLARE_MUTEX_LOCKED(rpciod_running);
1037
1038 static inline int
1039 rpciod_task_pending(void)
1040 {
1041 return schedq.task != NULL || xprt_tcp_pending();
1042 }
1043
1044
1045 /*
1046 * This is the rpciod kernel thread
1047 */
1048 static int
1049 rpciod(void *ptr)
1050 {
1051 wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
1052 int rounds = 0;
1053
1054 MOD_INC_USE_COUNT;
1055 lock_kernel();
1056 /*
1057 * Let our maker know we're running ...
1058 */
1059 rpciod_pid = current->pid;
1060 up(&rpciod_running);
1061
1062 daemonize();
1063
1064 spin_lock_irq(¤t->sigmask_lock);
1065 siginitsetinv(¤t->blocked, sigmask(SIGKILL));
1066 recalc_sigpending(current);
1067 spin_unlock_irq(¤t->sigmask_lock);
1068
1069 strcpy(current->comm, "rpciod");
1070
1071 current->flags |= PF_MEMALLOC;
1072
1073 dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
1074 while (rpciod_users) {
1075 if (signalled()) {
1076 rpciod_killall();
1077 flush_signals(current);
1078 }
1079 __rpc_schedule();
1080
1081 if (++rounds >= 64) { /* safeguard */
1082 schedule();
1083 rounds = 0;
1084 }
1085
1086 if (!rpciod_task_pending()) {
1087 dprintk("RPC: rpciod back to sleep\n");
1088 wait_event_interruptible(rpciod_idle, rpciod_task_pending());
1089 dprintk("RPC: switch to rpciod\n");
1090 rounds = 0;
1091 }
1092 }
1093
1094 dprintk("RPC: rpciod shutdown commences\n");
1095 if (all_tasks) {
1096 printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
1097 rpciod_killall();
1098 }
1099
1100 rpciod_pid = 0;
1101 wake_up(assassin);
1102
1103 dprintk("RPC: rpciod exiting\n");
1104 MOD_DEC_USE_COUNT;
1105 return 0;
1106 }
1107
1108 static void
1109 rpciod_killall(void)
1110 {
1111 unsigned long flags;
1112
1113 while (all_tasks) {
1114 current->sigpending = 0;
1115 rpc_killall_tasks(NULL);
1116 __rpc_schedule();
1117 if (all_tasks) {
1118 dprintk("rpciod_killall: waiting for tasks to exit\n");
1119 set_current_state(TASK_INTERRUPTIBLE);
1120 schedule_timeout(1);
1121 }
1122 }
1123
1124 spin_lock_irqsave(¤t->sigmask_lock, flags);
1125 recalc_sigpending(current);
1126 spin_unlock_irqrestore(¤t->sigmask_lock, flags);
1127 }
1128
1129 /*
1130 * Start up the rpciod process if it's not already running.
1131 */
1132 int
1133 rpciod_up(void)
1134 {
1135 int error = 0;
1136
1137 MOD_INC_USE_COUNT;
1138 down(&rpciod_sema);
1139 dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
1140 rpciod_users++;
1141 if (rpciod_pid)
1142 goto out;
1143 /*
1144 * If there's no pid, we should be the first user.
1145 */
1146 if (rpciod_users > 1)
1147 printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
1148 /*
1149 * Create the rpciod thread and wait for it to start.
1150 */
1151 error = kernel_thread(rpciod, &rpciod_killer, 0);
1152 if (error < 0) {
1153 printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
1154 rpciod_users--;
1155 goto out;
1156 }
1157 down(&rpciod_running);
1158 error = 0;
1159 out:
1160 up(&rpciod_sema);
1161 MOD_DEC_USE_COUNT;
1162 return error;
1163 }
1164
1165 void
1166 rpciod_down(void)
1167 {
1168 unsigned long flags;
1169
1170 MOD_INC_USE_COUNT;
1171 down(&rpciod_sema);
1172 dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
1173 if (rpciod_users) {
1174 if (--rpciod_users)
1175 goto out;
1176 } else
1177 printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
1178
1179 if (!rpciod_pid) {
1180 dprintk("rpciod_down: Nothing to do!\n");
1181 goto out;
1182 }
1183
1184 kill_proc(rpciod_pid, SIGKILL, 1);
1185 /*
1186 * Usually rpciod will exit very quickly, so we
1187 * wait briefly before checking the process id.
1188 */
1189 current->sigpending = 0;
1190 set_current_state(TASK_INTERRUPTIBLE);
1191 schedule_timeout(1);
1192 /*
1193 * Display a message if we're going to wait longer.
1194 */
1195 while (rpciod_pid) {
1196 dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid);
1197 if (signalled()) {
1198 dprintk("rpciod_down: caught signal\n");
1199 break;
1200 }
1201 interruptible_sleep_on(&rpciod_killer);
1202 }
1203 spin_lock_irqsave(¤t->sigmask_lock, flags);
1204 recalc_sigpending(current);
1205 spin_unlock_irqrestore(¤t->sigmask_lock, flags);
1206 out:
1207 up(&rpciod_sema);
1208 MOD_DEC_USE_COUNT;
1209 }
1210
1211 #ifdef RPC_DEBUG
1212 void rpc_show_tasks(void)
1213 {
1214 struct rpc_task *t = all_tasks, *next;
1215
1216 spin_lock(&rpc_sched_lock);
1217 t = all_tasks;
1218 if (!t) {
1219 spin_unlock(&rpc_sched_lock);
1220 return;
1221 }
1222 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1223 "-rpcwait -action- --exit--\n");
1224 for (; t; t = next) {
1225 next = t->tk_next_task;
1226 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1227 t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
1228 t->tk_client, t->tk_client->cl_prog,
1229 t->tk_rqstp, t->tk_timeout,
1230 t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
1231 t->tk_action, t->tk_exit);
1232 }
1233 spin_unlock(&rpc_sched_lock);
1234 }
1235 #endif
1236