File: /usr/src/linux/net/sunrpc/svcsock.c

1     /*
2      * linux/net/sunrpc/svcsock.c
3      *
4      * These are the RPC server socket internals.
5      *
6      * The server scheduling algorithm does not always distribute the load
7      * evenly when servicing a single client. May need to modify the
8      * svc_sock_enqueue procedure...
9      *
10      * TCP support is largely untested and may be a little slow. The problem
11      * is that we currently do two separate recvfrom's, one for the 4-byte
12      * record length, and the second for the actual record. This could possibly
13      * be improved by always reading a minimum size of around 100 bytes and
14      * tucking any superfluous bytes away in a temporary store. Still, that
15      * leaves write requests out in the rain. An alternative may be to peek at
16      * the first skb in the queue, and if it matches the next TCP sequence
17      * number, to extract the record marker. Yuck.
18      *
19      * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
20      */
21     
22     #include <linux/sched.h>
23     #include <linux/errno.h>
24     #include <linux/fcntl.h>
25     #include <linux/net.h>
26     #include <linux/in.h>
27     #include <linux/inet.h>
28     #include <linux/udp.h>
29     #include <linux/version.h>
30     #include <linux/unistd.h>
31     #include <linux/slab.h>
32     #include <linux/netdevice.h>
33     #include <linux/skbuff.h>
34     #include <net/sock.h>
35     #include <net/checksum.h>
36     #include <net/ip.h>
37     #include <asm/uaccess.h>
38     #include <asm/ioctls.h>
39     
40     #include <linux/sunrpc/types.h>
41     #include <linux/sunrpc/xdr.h>
42     #include <linux/sunrpc/svcsock.h>
43     #include <linux/sunrpc/stats.h>
44     
45     /* SMP locking strategy:
46      *
47      * 	svc_sock->sk_lock and svc_serv->sv_lock protect their
48      *	respective structures.
49      *
50      *	Antideadlock ordering is sk_lock --> sv_lock.
51      */
52     
53     #define RPCDBG_FACILITY	RPCDBG_SVCSOCK
54     
55     
56     static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
57     					 int *errp, int pmap_reg);
58     static void		svc_udp_data_ready(struct sock *, int);
59     static int		svc_udp_recvfrom(struct svc_rqst *);
60     static int		svc_udp_sendto(struct svc_rqst *);
61     
62     
63     /*
64      * Queue up an idle server thread.  Must have serv->sv_lock held.
65      */
66     static inline void
67     svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
68     {
69     	rpc_append_list(&serv->sv_threads, rqstp);
70     }
71     
72     /*
73      * Dequeue an nfsd thread.  Must have serv->sv_lock held.
74      */
75     static inline void
76     svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
77     {
78     	rpc_remove_list(&serv->sv_threads, rqstp);
79     }
80     
81     /*
82      * Release an skbuff after use
83      */
84     static inline void
85     svc_release_skb(struct svc_rqst *rqstp)
86     {
87     	struct sk_buff *skb = rqstp->rq_skbuff;
88     
89     	if (!skb)
90     		return;
91     	rqstp->rq_skbuff = NULL;
92     
93     	dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
94     	skb_free_datagram(rqstp->rq_sock->sk_sk, skb);
95     }
96     
97     /*
98      * Queue up a socket with data pending. If there are idle nfsd
99      * processes, wake 'em up.
100      *
101      * This must be called with svsk->sk_lock held.
102      */
103     static void
104     svc_sock_enqueue(struct svc_sock *svsk)
105     {
106     	struct svc_serv	*serv = svsk->sk_server;
107     	struct svc_rqst	*rqstp;
108     
109     	/* NOTE: Local BH is already disabled by our caller. */
110     	spin_lock(&serv->sv_lock);
111     
112     	if (serv->sv_threads && serv->sv_sockets)
113     		printk(KERN_ERR
114     			"svc_sock_enqueue: threads and sockets both waiting??\n");
115     
116     	if (svsk->sk_busy) {
117     		/* Don't enqueue socket while daemon is receiving */
118     		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
119     		goto out_unlock;
120     	}
121     
122     	/* Mark socket as busy. It will remain in this state until the
123     	 * server has processed all pending data and put the socket back
124     	 * on the idle list.
125     	 */
126     	svsk->sk_busy = 1;
127     
128     	if ((rqstp = serv->sv_threads) != NULL) {
129     		dprintk("svc: socket %p served by daemon %p\n",
130     			svsk->sk_sk, rqstp);
131     		svc_serv_dequeue(serv, rqstp);
132     		if (rqstp->rq_sock)
133     			printk(KERN_ERR 
134     				"svc_sock_enqueue: server %p, rq_sock=%p!\n",
135     				rqstp, rqstp->rq_sock);
136     		rqstp->rq_sock = svsk;
137     		svsk->sk_inuse++;
138     		wake_up(&rqstp->rq_wait);
139     	} else {
140     		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
141     		rpc_append_list(&serv->sv_sockets, svsk);
142     		svsk->sk_qued = 1;
143     	}
144     
145     out_unlock:
146     	spin_unlock(&serv->sv_lock);
147     }
148     
149     /*
150      * Dequeue the first socket.  Must be called with the serv->sv_lock held.
151      */
152     static inline struct svc_sock *
153     svc_sock_dequeue(struct svc_serv *serv)
154     {
155     	struct svc_sock	*svsk;
156     
157     	if ((svsk = serv->sv_sockets) != NULL)
158     		rpc_remove_list(&serv->sv_sockets, svsk);
159     
160     	if (svsk) {
161     		dprintk("svc: socket %p dequeued, inuse=%d\n",
162     			svsk->sk_sk, svsk->sk_inuse);
163     		svsk->sk_qued = 0;
164     	}
165     
166     	return svsk;
167     }
168     
169     /*
170      * Having read count bytes from a socket, check whether it
171      * needs to be re-enqueued.
172      */
173     static inline void
174     svc_sock_received(struct svc_sock *svsk, int count)
175     {
176     	spin_lock_bh(&svsk->sk_lock);
177     	if ((svsk->sk_data -= count) < 0) {
178     		printk(KERN_NOTICE "svc: sk_data negative!\n");
179     		svsk->sk_data = 0;
180     	}
181     	svsk->sk_rqstp = NULL; /* XXX */
182     	svsk->sk_busy = 0;
183     	if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
184     		dprintk("svc: socket %p re-enqueued after receive\n",
185     						svsk->sk_sk);
186     		svc_sock_enqueue(svsk);
187     	}
188     	spin_unlock_bh(&svsk->sk_lock);
189     }
190     
191     /*
192      * Dequeue a new connection.
193      */
194     static inline void
195     svc_sock_accepted(struct svc_sock *svsk)
196     {
197     	spin_lock_bh(&svsk->sk_lock);
198             svsk->sk_busy = 0;
199             svsk->sk_conn--;
200             if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
201                     dprintk("svc: socket %p re-enqueued after accept\n",
202     						svsk->sk_sk);
203                     svc_sock_enqueue(svsk);
204             }
205     	spin_unlock_bh(&svsk->sk_lock);
206     }
207     
208     /*
209      * Release a socket after use.
210      */
211     static inline void
212     svc_sock_release(struct svc_rqst *rqstp)
213     {
214     	struct svc_sock	*svsk = rqstp->rq_sock;
215     	struct svc_serv	*serv = svsk->sk_server;
216     
217     	svc_release_skb(rqstp);
218     	rqstp->rq_sock = NULL;
219     
220     	spin_lock_bh(&serv->sv_lock);
221     	if (!--(svsk->sk_inuse) && svsk->sk_dead) {
222     		spin_unlock_bh(&serv->sv_lock);
223     		dprintk("svc: releasing dead socket\n");
224     		sock_release(svsk->sk_sock);
225     		kfree(svsk);
226     	}
227     	else
228     		spin_unlock_bh(&serv->sv_lock);
229     }
230     
231     /*
232      * External function to wake up a server waiting for data
233      */
234     void
235     svc_wake_up(struct svc_serv *serv)
236     {
237     	struct svc_rqst	*rqstp;
238     
239     	spin_lock_bh(&serv->sv_lock);
240     	if ((rqstp = serv->sv_threads) != NULL) {
241     		dprintk("svc: daemon %p woken up.\n", rqstp);
242     		/*
243     		svc_serv_dequeue(serv, rqstp);
244     		rqstp->rq_sock = NULL;
245     		 */
246     		wake_up(&rqstp->rq_wait);
247     	}
248     	spin_unlock_bh(&serv->sv_lock);
249     }
250     
251     /*
252      * Generic sendto routine
253      */
254     static int
255     svc_sendto(struct svc_rqst *rqstp, struct iovec *iov, int nr)
256     {
257     	mm_segment_t	oldfs;
258     	struct svc_sock	*svsk = rqstp->rq_sock;
259     	struct socket	*sock = svsk->sk_sock;
260     	struct msghdr	msg;
261     	int		i, buflen, len;
262     
263     	for (i = buflen = 0; i < nr; i++)
264     		buflen += iov[i].iov_len;
265     
266     	msg.msg_name    = &rqstp->rq_addr;
267     	msg.msg_namelen = sizeof(rqstp->rq_addr);
268     	msg.msg_iov     = iov;
269     	msg.msg_iovlen  = nr;
270     	msg.msg_control = NULL;
271     	msg.msg_controllen = 0;
272     
273     	msg.msg_flags	= MSG_DONTWAIT;
274     
275     	oldfs = get_fs(); set_fs(KERNEL_DS);
276     	len = sock_sendmsg(sock, &msg, buflen);
277     	set_fs(oldfs);
278     
279     	dprintk("svc: socket %p sendto([%p %Zu... ], %d, %d) = %d\n",
280     			rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, nr, buflen, len);
281     
282     	return len;
283     }
284     
285     /*
286      * Check input queue length
287      */
288     static int
289     svc_recv_available(struct svc_sock *svsk)
290     {
291     	mm_segment_t	oldfs;
292     	struct socket	*sock = svsk->sk_sock;
293     	int		avail, err;
294     
295     	oldfs = get_fs(); set_fs(KERNEL_DS);
296     	err = sock->ops->ioctl(sock, TIOCINQ, (unsigned long) &avail);
297     	set_fs(oldfs);
298     
299     	return (err >= 0)? avail : err;
300     }
301     
302     /*
303      * Generic recvfrom routine.
304      */
305     static int
306     svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen)
307     {
308     	mm_segment_t	oldfs;
309     	struct msghdr	msg;
310     	struct socket	*sock;
311     	int		len, alen;
312     
313     	rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
314     	sock = rqstp->rq_sock->sk_sock;
315     
316     	msg.msg_name    = &rqstp->rq_addr;
317     	msg.msg_namelen = sizeof(rqstp->rq_addr);
318     	msg.msg_iov     = iov;
319     	msg.msg_iovlen  = nr;
320     	msg.msg_control = NULL;
321     	msg.msg_controllen = 0;
322     
323     	msg.msg_flags	= MSG_DONTWAIT;
324     
325     	oldfs = get_fs(); set_fs(KERNEL_DS);
326     	len = sock_recvmsg(sock, &msg, buflen, MSG_DONTWAIT);
327     	set_fs(oldfs);
328     
329     	/* sock_recvmsg doesn't fill in the name/namelen, so we must..
330     	 * possibly we should cache this in the svc_sock structure
331     	 * at accept time. FIXME
332     	 */
333     	alen = sizeof(rqstp->rq_addr);
334     	sock->ops->getname(sock, (struct sockaddr *)&rqstp->rq_addr, &alen, 1);
335     
336     	dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
337     		rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, len);
338     
339     	return len;
340     }
341     
342     /*
343      * INET callback when data has been received on the socket.
344      */
345     static void
346     svc_udp_data_ready(struct sock *sk, int count)
347     {
348     	struct svc_sock	*svsk = (struct svc_sock *)(sk->user_data);
349     
350     	if (!svsk)
351     		goto out;
352     	dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
353     		svsk, sk, count, svsk->sk_busy);
354     	spin_lock_bh(&svsk->sk_lock);
355     	svsk->sk_data = 1;
356     	svc_sock_enqueue(svsk);
357     	spin_unlock_bh(&svsk->sk_lock);
358      out:
359     	if (sk->sleep && waitqueue_active(sk->sleep))
360     		wake_up_interruptible(sk->sleep);
361     }
362     
363     /*
364      * Receive a datagram from a UDP socket.
365      */
366     static int
367     svc_udp_recvfrom(struct svc_rqst *rqstp)
368     {
369     	struct svc_sock	*svsk = rqstp->rq_sock;
370     	struct svc_serv	*serv = svsk->sk_server;
371     	struct sk_buff	*skb;
372     	u32		*data;
373     	int		err, len;
374     
375     	svsk->sk_data = 0;
376     	while ((skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err)) == NULL) {
377     		svc_sock_received(svsk, 0);
378     		if (err == -EAGAIN)
379     			return err;
380     		/* possibly an icmp error */
381     		dprintk("svc: recvfrom returned error %d\n", -err);
382     	}
383     
384     	/* Sorry. */
385     	if (skb_is_nonlinear(skb)) {
386     		if (skb_linearize(skb, GFP_KERNEL) != 0) {
387     			kfree_skb(skb);
388     			svc_sock_received(svsk, 0);
389     			return 0;
390     		}
391     	}
392     
393     	if (skb->ip_summed != CHECKSUM_UNNECESSARY) {
394     		if ((unsigned short)csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) {
395     			skb_free_datagram(svsk->sk_sk, skb);
396     			svc_sock_received(svsk, 0);
397     			return 0;
398     		}
399     	}
400     
401     	/* There may be more data */
402     	svsk->sk_data = 1;
403     
404     	len  = skb->len - sizeof(struct udphdr);
405     	data = (u32 *) (skb->data + sizeof(struct udphdr));
406     
407     	rqstp->rq_skbuff      = skb;
408     	rqstp->rq_argbuf.base = data;
409     	rqstp->rq_argbuf.buf  = data;
410     	rqstp->rq_argbuf.len  = (len >> 2);
411     	/* rqstp->rq_resbuf      = rqstp->rq_defbuf; */
412     	rqstp->rq_prot        = IPPROTO_UDP;
413     
414     	/* Get sender address */
415     	rqstp->rq_addr.sin_family = AF_INET;
416     	rqstp->rq_addr.sin_port = skb->h.uh->source;
417     	rqstp->rq_addr.sin_addr.s_addr = skb->nh.iph->saddr;
418     
419     	if (serv->sv_stats)
420     		serv->sv_stats->netudpcnt++;
421     
422     	/* One down, maybe more to go... */
423     	svsk->sk_sk->stamp = skb->stamp;
424     	svc_sock_received(svsk, 0);
425     
426     	return len;
427     }
428     
429     static int
430     svc_udp_sendto(struct svc_rqst *rqstp)
431     {
432     	struct svc_buf	*bufp = &rqstp->rq_resbuf;
433     	int		error;
434     
435     	/* Set up the first element of the reply iovec.
436     	 * Any other iovecs that may be in use have been taken
437     	 * care of by the server implementation itself.
438     	 */
439     	/* bufp->base = bufp->area; */
440     	bufp->iov[0].iov_base = bufp->base;
441     	bufp->iov[0].iov_len  = bufp->len << 2;
442     
443     	error = svc_sendto(rqstp, bufp->iov, bufp->nriov);
444     	if (error == -ECONNREFUSED)
445     		/* ICMP error on earlier request. */
446     		error = svc_sendto(rqstp, bufp->iov, bufp->nriov);
447     	else if (error == -EAGAIN)
448     		/* Ignore and wait for re-xmit */
449     		error = 0;
450     
451     	return error;
452     }
453     
454     static int
455     svc_udp_init(struct svc_sock *svsk)
456     {
457     	svsk->sk_sk->data_ready = svc_udp_data_ready;
458     	svsk->sk_recvfrom = svc_udp_recvfrom;
459     	svsk->sk_sendto = svc_udp_sendto;
460     
461     	return 0;
462     }
463     
464     /*
465      * A data_ready event on a listening socket means there's a connection
466      * pending. Do not use state_change as a substitute for it.
467      */
468     static void
469     svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
470     {
471     	struct svc_sock	*svsk;
472     
473     	dprintk("svc: socket %p TCP (listen) state change %d\n",
474     			sk, sk->state);
475     
476     	if  (sk->state != TCP_ESTABLISHED) {
477     		/* Aborted connection, SYN_RECV or whatever... */
478     		goto out;
479     	}
480     	if (!(svsk = (struct svc_sock *) sk->user_data)) {
481     		printk("svc: socket %p: no user data\n", sk);
482     		goto out;
483     	}
484     	spin_lock_bh(&svsk->sk_lock);
485     	svsk->sk_conn++;
486     	svc_sock_enqueue(svsk);
487     	spin_unlock_bh(&svsk->sk_lock);
488      out:
489     	if (sk->sleep && waitqueue_active(sk->sleep))
490     		wake_up_interruptible_all(sk->sleep);
491     }
492     
493     /*
494      * A state change on a connected socket means it's dying or dead.
495      */
496     static void
497     svc_tcp_state_change(struct sock *sk)
498     {
499     	struct svc_sock	*svsk;
500     
501     	dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
502     			sk, sk->state, sk->user_data);
503     
504     	if (!(svsk = (struct svc_sock *) sk->user_data)) {
505     		printk("svc: socket %p: no user data\n", sk);
506     		goto out;
507     	}
508     	spin_lock_bh(&svsk->sk_lock);
509     	svsk->sk_close = 1;
510     	svc_sock_enqueue(svsk);
511     	spin_unlock_bh(&svsk->sk_lock);
512      out:
513     	if (sk->sleep && waitqueue_active(sk->sleep))
514     		wake_up_interruptible_all(sk->sleep);
515     }
516     
517     static void
518     svc_tcp_data_ready(struct sock *sk, int count)
519     {
520     	struct svc_sock *	svsk;
521     
522     	dprintk("svc: socket %p TCP data ready (svsk %p)\n",
523     			sk, sk->user_data);
524     	if (!(svsk = (struct svc_sock *)(sk->user_data)))
525     		goto out;
526     	spin_lock_bh(&svsk->sk_lock);
527     	svsk->sk_data++;
528     	svc_sock_enqueue(svsk);
529     	spin_unlock_bh(&svsk->sk_lock);
530      out:
531     	if (sk->sleep && waitqueue_active(sk->sleep))
532     		wake_up_interruptible(sk->sleep);
533     }
534     
535     /*
536      * Accept a TCP connection
537      */
538     static void
539     svc_tcp_accept(struct svc_sock *svsk)
540     {
541     	struct sockaddr_in sin;
542     	struct svc_serv	*serv = svsk->sk_server;
543     	struct socket	*sock = svsk->sk_sock;
544     	struct socket	*newsock;
545     	struct proto_ops *ops;
546     	struct svc_sock	*newsvsk;
547     	int		err, slen;
548     
549     	dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
550     	if (!sock)
551     		return;
552     
553     	if (!(newsock = sock_alloc())) {
554     		printk(KERN_WARNING "%s: no more sockets!\n", serv->sv_name);
555     		return;
556     	}
557     	dprintk("svc: tcp_accept %p allocated\n", newsock);
558     
559     	newsock->type = sock->type;
560     	newsock->ops = ops = sock->ops;
561     
562     	if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) {
563     		if (net_ratelimit())
564     			printk(KERN_WARNING "%s: accept failed (err %d)!\n",
565     				   serv->sv_name, -err);
566     		goto failed;		/* aborted connection or whatever */
567     	}
568     
569     	slen = sizeof(sin);
570     	err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1);
571     	if (err < 0) {
572     		if (net_ratelimit())
573     			printk(KERN_WARNING "%s: peername failed (err %d)!\n",
574     				   serv->sv_name, -err);
575     		goto failed;		/* aborted connection or whatever */
576     	}
577     
578     	/* Ideally, we would want to reject connections from unauthorized
579     	 * hosts here, but when we get encription, the IP of the host won't
580     	 * tell us anything. For now just warn about unpriv connections.
581     	 */
582     	if (ntohs(sin.sin_port) >= 1024) {
583     		if (net_ratelimit())
584     			printk(KERN_WARNING
585     				   "%s: connect from unprivileged port: %u.%u.%u.%u:%d\n",
586     				   serv->sv_name, 
587     				   NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
588     	}
589     
590     	dprintk("%s: connect from %u.%u.%u.%u:%04x\n", serv->sv_name,
591     			NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
592     
593     	if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0)))
594     		goto failed;
595     
596     	/* Precharge. Data may have arrived on the socket before we
597     	 * installed the data_ready callback. 
598     	 */
599     	spin_lock_bh(&newsvsk->sk_lock);
600     	newsvsk->sk_data = 1;
601     	newsvsk->sk_temp = 1;
602     	svc_sock_enqueue(newsvsk);
603     	spin_unlock_bh(&newsvsk->sk_lock);
604     
605     	if (serv->sv_stats)
606     		serv->sv_stats->nettcpconn++;
607     
608     	return;
609     
610     failed:
611     	sock_release(newsock);
612     	return;
613     }
614     
615     /*
616      * Receive data from a TCP socket.
617      */
618     static int
619     svc_tcp_recvfrom(struct svc_rqst *rqstp)
620     {
621     	struct svc_sock	*svsk = rqstp->rq_sock;
622     	struct svc_serv	*serv = svsk->sk_server;
623     	struct svc_buf	*bufp = &rqstp->rq_argbuf;
624     	int		len, ready, used;
625     
626     	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
627     			svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close);
628     
629     	if (svsk->sk_close) {
630     		svc_delete_socket(svsk);
631     		return 0;
632     	}
633     
634     	if (svsk->sk_conn) {
635     		svc_tcp_accept(svsk);
636     		svc_sock_accepted(svsk);
637     		return 0;
638     	}
639     
640     	ready = svsk->sk_data;
641     
642     	/* Receive data. If we haven't got the record length yet, get
643     	 * the next four bytes. Otherwise try to gobble up as much as
644     	 * possible up to the complete record length.
645     	 */
646     	if (svsk->sk_tcplen < 4) {
647     		unsigned long	want = 4 - svsk->sk_tcplen;
648     		struct iovec	iov;
649     
650     		iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
651     		iov.iov_len  = want;
652     		if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
653     			goto error;
654     		svsk->sk_tcplen += len;
655     
656     		svsk->sk_reclen = ntohl(svsk->sk_reclen);
657     		if (!(svsk->sk_reclen & 0x80000000)) {
658     			/* FIXME: technically, a record can be fragmented,
659     			 *  and non-terminal fragments will not have the top
660     			 *  bit set in the fragment length header.
661     			 *  But apparently no known nfs clients send fragmented
662     			 *  records. */
663     			/* FIXME: shutdown socket */
664     			printk(KERN_NOTICE "RPC: bad TCP reclen %08lx",
665     			       (unsigned long) svsk->sk_reclen);
666     			return -EIO;
667     		}
668     		svsk->sk_reclen &= 0x7fffffff;
669     		dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
670     	}
671     
672     	/* Check whether enough data is available */
673     	len = svc_recv_available(svsk);
674     	if (len < 0)
675     		goto error;
676     
677     	if (len < svsk->sk_reclen) {
678     		/* FIXME: if sk_reclen > window-size, then we will
679     		 * never be able to receive the record, so should
680     		 * shutdown the connection
681     		 */
682     		dprintk("svc: incomplete TCP record (%d of %d)\n",
683     			len, svsk->sk_reclen);
684     		svc_sock_received(svsk, ready);
685     		return -EAGAIN;	/* record not complete */
686     	}
687     	/* if we think there is only one more record to read, but
688     	 * it is bigger than we expect, then two records must have arrived
689     	 * together, so pretend we aren't using the record.. */
690     	if (len > svsk->sk_reclen && ready == 1)
691     		used = 0;
692     	else	used = 1;
693     
694     	/* Frob argbuf */
695     	bufp->iov[0].iov_base += 4;
696     	bufp->iov[0].iov_len  -= 4;
697     
698     	/* Now receive data */
699     	len = svc_recvfrom(rqstp, bufp->iov, bufp->nriov, svsk->sk_reclen);
700     	if (len < 0)
701     		goto error;
702     
703     	dprintk("svc: TCP complete record (%d bytes)\n", len);
704     
705     	/* Position reply write pointer immediately after
706     	 * record length */
707     	rqstp->rq_resbuf.buf += 1;
708     	rqstp->rq_resbuf.len  = 1;
709     
710     	rqstp->rq_skbuff      = 0;
711     	rqstp->rq_argbuf.buf += 1;
712     	rqstp->rq_argbuf.len  = (len >> 2);
713     	rqstp->rq_prot	      = IPPROTO_TCP;
714     
715     	/* Reset TCP read info */
716     	svsk->sk_reclen = 0;
717     	svsk->sk_tcplen = 0;
718     
719     	svc_sock_received(svsk, used);
720     	if (serv->sv_stats)
721     		serv->sv_stats->nettcpcnt++;
722     
723     	return len;
724     
725     error:
726     	if (len == -EAGAIN) {
727     		dprintk("RPC: TCP recvfrom got EAGAIN\n");
728     		svc_sock_received(svsk, ready); /* Clear data ready */
729     	} else {
730     		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
731     					svsk->sk_server->sv_name, -len);
732     		svc_sock_received(svsk, 0);
733     	}
734     
735     	return len;
736     }
737     
738     /*
739      * Send out data on TCP socket.
740      * FIXME: Make the sendto call non-blocking in order not to hang
741      * a daemon on a dead client. Requires write queue maintenance.
742      */
743     static int
744     svc_tcp_sendto(struct svc_rqst *rqstp)
745     {
746     	struct svc_buf	*bufp = &rqstp->rq_resbuf;
747     	int sent;
748     
749     	/* Set up the first element of the reply iovec.
750     	 * Any other iovecs that may be in use have been taken
751     	 * care of by the server implementation itself.
752     	 */
753     	bufp->iov[0].iov_base = bufp->base;
754     	bufp->iov[0].iov_len  = bufp->len << 2;
755     	bufp->base[0] = htonl(0x80000000|((bufp->len << 2) - 4));
756     
757     	sent = svc_sendto(rqstp, bufp->iov, bufp->nriov);
758     	if (sent != bufp->len<<2) {
759     		printk(KERN_NOTICE "rpc-srv/tcp: %s: sent only %d bytes of %d - should shutdown socket\n",
760     		       rqstp->rq_sock->sk_server->sv_name,
761     		       sent, bufp->len << 2);
762     		/* FIXME: should shutdown the socket, or allocate more memort
763     		 * or wait and try again or something.  Otherwise
764     		 * client will get confused
765     		 */
766     	}
767     	return sent;
768     }
769     
770     static int
771     svc_tcp_init(struct svc_sock *svsk)
772     {
773     	struct sock	*sk = svsk->sk_sk;
774     
775     	svsk->sk_recvfrom = svc_tcp_recvfrom;
776     	svsk->sk_sendto = svc_tcp_sendto;
777     
778     	if (sk->state == TCP_LISTEN) {
779     		dprintk("setting up TCP socket for listening\n");
780     		sk->data_ready = svc_tcp_listen_data_ready;
781     	} else {
782     		dprintk("setting up TCP socket for reading\n");
783     		sk->state_change = svc_tcp_state_change;
784     		sk->data_ready = svc_tcp_data_ready;
785     
786     		svsk->sk_reclen = 0;
787     		svsk->sk_tcplen = 0;
788     	}
789     
790     	return 0;
791     }
792     
793     /*
794      * Receive the next request on any socket.
795      */
796     int
797     svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout)
798     {
799     	struct svc_sock		*svsk;
800     	int			len;
801     	DECLARE_WAITQUEUE(wait, current);
802     
803     	dprintk("svc: server %p waiting for data (to = %ld)\n",
804     		rqstp, timeout);
805     
806     	if (rqstp->rq_sock)
807     		printk(KERN_ERR 
808     			"svc_recv: service %p, socket not NULL!\n",
809     			 rqstp);
810     	if (waitqueue_active(&rqstp->rq_wait))
811     		printk(KERN_ERR 
812     			"svc_recv: service %p, wait queue active!\n",
813     			 rqstp);
814     
815     	/* Initialize the buffers */
816     	rqstp->rq_argbuf = rqstp->rq_defbuf;
817     	rqstp->rq_resbuf = rqstp->rq_defbuf;
818     
819     	if (signalled())
820     		return -EINTR;
821     
822     	spin_lock_bh(&serv->sv_lock);
823     	if ((svsk = svc_sock_dequeue(serv)) != NULL) {
824     		rqstp->rq_sock = svsk;
825     		svsk->sk_inuse++;
826     	} else {
827     		/* No data pending. Go to sleep */
828     		svc_serv_enqueue(serv, rqstp);
829     
830     		/*
831     		 * We have to be able to interrupt this wait
832     		 * to bring down the daemons ...
833     		 */
834     		set_current_state(TASK_INTERRUPTIBLE);
835     		add_wait_queue(&rqstp->rq_wait, &wait);
836     		spin_unlock_bh(&serv->sv_lock);
837     
838     		schedule_timeout(timeout);
839     
840     		spin_lock_bh(&serv->sv_lock);
841     		remove_wait_queue(&rqstp->rq_wait, &wait);
842     
843     		if (!(svsk = rqstp->rq_sock)) {
844     			svc_serv_dequeue(serv, rqstp);
845     			spin_unlock_bh(&serv->sv_lock);
846     			dprintk("svc: server %p, no data yet\n", rqstp);
847     			return signalled()? -EINTR : -EAGAIN;
848     		}
849     	}
850     	spin_unlock_bh(&serv->sv_lock);
851     
852     	dprintk("svc: server %p, socket %p, inuse=%d\n",
853     		 rqstp, svsk, svsk->sk_inuse);
854     	len = svsk->sk_recvfrom(rqstp);
855     	dprintk("svc: got len=%d\n", len);
856     
857     	/* No data, incomplete (TCP) read, or accept() */
858     	if (len == 0 || len == -EAGAIN) {
859     		svc_sock_release(rqstp);
860     		return -EAGAIN;
861     	}
862     
863     	rqstp->rq_secure  = ntohs(rqstp->rq_addr.sin_port) < 1024;
864     	rqstp->rq_userset = 0;
865     	rqstp->rq_verfed  = 0;
866     
867     	svc_getlong(&rqstp->rq_argbuf, rqstp->rq_xid);
868     	svc_putlong(&rqstp->rq_resbuf, rqstp->rq_xid);
869     
870     	/* Assume that the reply consists of a single buffer. */
871     	rqstp->rq_resbuf.nriov = 1;
872     
873     	if (serv->sv_stats)
874     		serv->sv_stats->netcnt++;
875     	return len;
876     }
877     
878     /* 
879      * Drop request
880      */
881     void
882     svc_drop(struct svc_rqst *rqstp)
883     {
884     	dprintk("svc: socket %p dropped request\n", rqstp->rq_sock);
885     	svc_sock_release(rqstp);
886     }
887     
888     /*
889      * Return reply to client.
890      */
891     int
892     svc_send(struct svc_rqst *rqstp)
893     {
894     	struct svc_sock	*svsk;
895     	int		len;
896     
897     	if ((svsk = rqstp->rq_sock) == NULL) {
898     		printk(KERN_WARNING "NULL socket pointer in %s:%d\n",
899     				__FILE__, __LINE__);
900     		return -EFAULT;
901     	}
902     
903     	/* release the receive skb before sending the reply */
904     	svc_release_skb(rqstp);
905     
906     	len = svsk->sk_sendto(rqstp);
907     	svc_sock_release(rqstp);
908     
909     	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
910     		return 0;
911     	return len;
912     }
913     
914     /*
915      * Initialize socket for RPC use and create svc_sock struct
916      * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF.
917      */
918     static struct svc_sock *
919     svc_setup_socket(struct svc_serv *serv, struct socket *sock,
920     					int *errp, int pmap_register)
921     {
922     	struct svc_sock	*svsk;
923     	struct sock	*inet;
924     
925     	dprintk("svc: svc_setup_socket %p\n", sock);
926     	if (!(svsk = kmalloc(sizeof(*svsk), GFP_KERNEL))) {
927     		*errp = -ENOMEM;
928     		return NULL;
929     	}
930     	memset(svsk, 0, sizeof(*svsk));
931     
932     	inet = sock->sk;
933     	inet->user_data = svsk;
934     	svsk->sk_sock = sock;
935     	svsk->sk_sk = inet;
936     	svsk->sk_ostate = inet->state_change;
937     	svsk->sk_odata = inet->data_ready;
938     	svsk->sk_server = serv;
939     	spin_lock_init(&svsk->sk_lock);
940     
941     	/* Initialize the socket */
942     	if (sock->type == SOCK_DGRAM)
943     		*errp = svc_udp_init(svsk);
944     	else
945     		*errp = svc_tcp_init(svsk);
946     if (svsk->sk_sk == NULL)
947     	printk(KERN_WARNING "svsk->sk_sk == NULL after svc_prot_init!\n");
948     
949     	/* Register socket with portmapper */
950     	if (*errp >= 0 && pmap_register)
951     		*errp = svc_register(serv, inet->protocol, ntohs(inet->sport));
952     
953     	if (*errp < 0) {
954     		inet->user_data = NULL;
955     		kfree(svsk);
956     		return NULL;
957     	}
958     
959     	spin_lock_bh(&serv->sv_lock);
960     	svsk->sk_list = serv->sv_allsocks;
961     	serv->sv_allsocks = svsk;
962     	spin_unlock_bh(&serv->sv_lock);
963     
964     	dprintk("svc: svc_setup_socket created %p (inet %p)\n",
965     				svsk, svsk->sk_sk);
966     	return svsk;
967     }
968     
969     /*
970      * Create socket for RPC service.
971      */
972     static int
973     svc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr_in *sin)
974     {
975     	struct svc_sock	*svsk;
976     	struct socket	*sock;
977     	int		error;
978     	int		type;
979     
980     	dprintk("svc: svc_create_socket(%s, %d, %u.%u.%u.%u:%d)\n",
981     				serv->sv_program->pg_name, protocol,
982     				NIPQUAD(sin->sin_addr.s_addr),
983     				ntohs(sin->sin_port));
984     
985     	if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
986     		printk(KERN_WARNING "svc: only UDP and TCP "
987     				"sockets supported\n");
988     		return -EINVAL;
989     	}
990     	type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
991     
992     	if ((error = sock_create(PF_INET, type, protocol, &sock)) < 0)
993     		return error;
994     
995     	if (sin != NULL) {
996     		error = sock->ops->bind(sock, (struct sockaddr *) sin,
997     						sizeof(*sin));
998     		if (error < 0)
999     			goto bummer;
1000     	}
1001     
1002     	if (protocol == IPPROTO_TCP) {
1003     		if ((error = sock->ops->listen(sock, 5)) < 0)
1004     			goto bummer;
1005     	}
1006     
1007     	if ((svsk = svc_setup_socket(serv, sock, &error, 1)) != NULL)
1008     		return 0;
1009     
1010     bummer:
1011     	dprintk("svc: svc_create_socket error = %d\n", -error);
1012     	sock_release(sock);
1013     	return error;
1014     }
1015     
1016     /*
1017      * Remove a dead socket
1018      */
1019     void
1020     svc_delete_socket(struct svc_sock *svsk)
1021     {
1022     	struct svc_sock	**rsk;
1023     	struct svc_serv	*serv;
1024     	struct sock	*sk;
1025     
1026     	dprintk("svc: svc_delete_socket(%p)\n", svsk);
1027     
1028     	serv = svsk->sk_server;
1029     	sk = svsk->sk_sk;
1030     
1031     	sk->state_change = svsk->sk_ostate;
1032     	sk->data_ready = svsk->sk_odata;
1033     
1034     	spin_lock_bh(&serv->sv_lock);
1035     
1036     	for (rsk = &serv->sv_allsocks; *rsk; rsk = &(*rsk)->sk_list) {
1037     		if (*rsk == svsk)
1038     			break;
1039     	}
1040     	if (!*rsk) {
1041     		spin_unlock_bh(&serv->sv_lock);
1042     		return;
1043     	}
1044     	*rsk = svsk->sk_list;
1045     	if (svsk->sk_qued)
1046     		rpc_remove_list(&serv->sv_sockets, svsk);
1047     
1048     
1049     	svsk->sk_dead = 1;
1050     
1051     	if (!svsk->sk_inuse) {
1052     		spin_unlock_bh(&serv->sv_lock);
1053     		sock_release(svsk->sk_sock);
1054     		kfree(svsk);
1055     	} else {
1056     		spin_unlock_bh(&serv->sv_lock);
1057     		printk(KERN_NOTICE "svc: server socket destroy delayed\n");
1058     		/* svsk->sk_server = NULL; */
1059     	}
1060     }
1061     
1062     /*
1063      * Make a socket for nfsd and lockd
1064      */
1065     int
1066     svc_makesock(struct svc_serv *serv, int protocol, unsigned short port)
1067     {
1068     	struct sockaddr_in	sin;
1069     
1070     	dprintk("svc: creating socket proto = %d\n", protocol);
1071     	sin.sin_family      = AF_INET;
1072     	sin.sin_addr.s_addr = INADDR_ANY;
1073     	sin.sin_port        = htons(port);
1074     	return svc_create_socket(serv, protocol, &sin);
1075     }
1076     
1077