免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3333 | 回复: 8
打印 上一主题 下一主题

[C] 原子写TCP SOCKET的内核补丁做出来了 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2010-02-04 09:16 |只看该作者 |倒序浏览
本帖最后由 思一克 于 2010-02-04 09:17 编辑

LINUX的TCP SOCKET写不是原子的。多个进程或线程写同一个SOCKET, 读出端得到的数据是交叉的。为了不交叉,写的进程们必须自己加锁。

该补丁就是解决这个的。是应最近的几个帖子的问题而写的。欢迎测试。

[PATCH 2.6.27.7-9-pae #7 SMP 1/1] networking tcp: Writing tcp socket be atomic
from: John Ye
Writing tcp socket is not atomic in current kernel. When a socket is written by
multi-processes or threads,the other end will read interleaved garbage data.

This simple patch is to resolve this issue, to make the stream socket writing
be atomic under certain data size limit.

Similar to file system pipe ( with a max atomic write limit ), an atomic
socket can be written by multi processes or threads.

But it’s more than pipe. The pipe can only be used by multi processes in a
local system, the atomic stream socket can be used remotely to send data
among machines without user level locking involved.

How to test this patch:
1) apply the patch to kernel and modules, reboot from the new patched kernel
2) #define TCP_ATOMIC 20 in your test.c (TCP_ATOMIC is defined as 20 in kernel)
3) create a tcp socket, set the atomic option.
for example:
int val = 512;
int len = 4;
if(setsockopt(s, IPPROTO_TCP, TCP_ATOMIC, &val, len) == -1) {
perror("setsockopt");
return -1 ;
}
will set the atomic max data size to 512 bytes

to get the current atomic size for socket s,
val = 0;
len = 4;
if(getsockopt(s, IPPROTO_TCP, TCP_ATOMIC, &val, &len) == -1) {
perror("setsockopt");
return -1 ;
}

4) Then, connect to a tcp server, fork a child process.
let both main process and child process write() or send() its own data block to the server.
>From the server, the received data bytes will be interleaved if no TCP_ATOMIC is set.
(I have a testing c code ready)

Signed-off-by: John Ye (Seeker) johnye@webizmail.com

---

--- linux/net/ipv4/tcp.c 2008-12-05 09:48:57.000000000 +0800
+++ linux/net/ipv4/tcp.c 2010-02-03 15:15:11.000000000 +0800
@@ -822,6 +822,7 @@
int mss_now, size_goal;
int err, copied;
long timeo;
+ int atomic; /* is atomic write? johnye. Feb 2, 2010 */

lock_sock(sk);
TCP_CHECK_TIMER(sk);
@@ -849,6 +850,11 @@
if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
goto do_error;

+
+ /* for multi-seg data or too big chunk, no atomic. johnye. */
+ atomic = tp->atomic_size;
+ if(iovlen > 1 || iov->iov_len > atomic) atomic = 0;
+
while (--iovlen >= 0) {
int seglen = iov->iov_len;
unsigned char __user *from = iov->iov_base;
@@ -889,14 +895,28 @@
if (copy > seglen)
copy = seglen;

+ /* if atomic write. johnye */
+ if (atomic)
+ copy = seglen;
+
/* Where to copy to? */
if (skb_tailroom(skb) > 0) {
/* We have some space in skb head. Superb! */
- if (copy > skb_tailroom(skb))
+ /* consider atomic write, johnye */
+ if (copy > skb_tailroom(skb)) {
+ if(atomic)
+ goto skb_page_start; /* q mark yet, johnye */
+
copy = skb_tailroom(skb);
+ }
if ((err = skb_add_data(skb, from, copy)) != 0)
goto do_fault;
- } else {
+
+ goto skb_page_done;
+ //} else {
+ }
+ skb_page_start:
+ {
int merge = 0;
int i = skb_shinfo(skb)->nr_frags;
struct page *page = TCP_PAGE(sk);
@@ -925,8 +945,17 @@
} else
off = 0;

- if (copy > PAGE_SIZE - off)
- copy = PAGE_SIZE - off;
+ /* consider atomic write, johnye */
+ if (copy > PAGE_SIZE - off) {
+ if (atomic && page) {
+ put_page(page);
+ TCP_PAGE(sk) = page = NULL;
+ off = 0;
+ merge = 0;
+ } else {
+ copy = PAGE_SIZE - off;
+ }
+ }

if (!sk_wmem_schedule(sk, copy))
goto wait_for_memory;
@@ -968,6 +997,7 @@

TCP_OFF(sk) = off + copy;
}
+ skb_page_done:

if (!copied)
TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
@@ -2019,6 +2049,16 @@
lock_sock(sk);

switch (optname) {
+
+ /* set the atomic write max size. johnye */
+ case TCP_ATOMIC:
+ if(val > 1024) {
+ err = -EINVAL;
+ break;
+ }
+ tp->atomic_size = val;
+ break;
+
case TCP_MAXSEG:
/* Values greater than interface MTU won't take effect. However
* at the point when this call is done we typically don't yet
@@ -2276,6 +2316,12 @@
return -EINVAL;

switch (optname) {
+
+ /* get the atomic write max size. johnye */
+ case TCP_ATOMIC:
+ val = tp->atomic_size;
+ break;
+
case TCP_MAXSEG:
val = tp->mss_cache;
if (!val && ((1 << sk->sk_state) & (TCPF_CLOSE | TCPF_LISTEN)))
--- linux/include/linux/tcp.h 2008-10-10 06:13:53.000000000 +0800
+++ linux/include/linux/tcp.h 2010-02-03 13:54:55.000000000 +0800
@@ -97,6 +97,8 @@
#define TCP_CONGESTION 13 /* Congestion control algorithm */
#define TCP_MD5SIG 14 /* TCP MD5 Signature (RFC2385) */

+#define TCP_ATOMIC 20 /* atomic TCP socket writting */
+
#define TCPI_OPT_TIMESTAMPS 1
#define TCPI_OPT_SACK 2
#define TCPI_OPT_WSCALE 4
@@ -411,6 +413,7 @@
#endif

int linger2;
+ u32 atomic_size; /* for atomic tcp socket write, johnye. Feb 2, 2010 */
};

static inline struct tcp_sock *tcp_sk(const struct sock *sk)

论坛徽章:
0
2 [报告]
发表于 2010-02-04 09:27 |只看该作者
得留个记号。。。

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
3 [报告]
发表于 2010-02-04 09:34 |只看该作者
虽然在包的概念上原子了,但发包的顺序还得自己控制,我觉得意义不大

论坛徽章:
1
申猴
日期:2014-02-11 14:50:31
4 [报告]
发表于 2010-02-04 09:43 |只看该作者
主要是tcp有缓冲区,怎么保证缓冲区的数据是正确的顺序呢?这是关键

论坛徽章:
0
5 [报告]
发表于 2010-02-04 09:44 |只看该作者
虽然在包的概念上原子了,但发包的顺序还得自己控制,我觉得意义不大
cookis 发表于 2010-02-04 09:34


如果包是原子了,顺序是无关的。
我们经常用pipe做这个。比如一个进程fork了10个子进程做工作,将工作结果通过pipe向主进程“回报”。因为子进程的工作步调是不同的,汇报的时间也是不定的。
所有包的顺序不是问题。只要发包是原子的即可保证正确的数据不乱。

论坛徽章:
0
6 [报告]
发表于 2010-02-04 09:50 |只看该作者
主要是tcp有缓冲区,怎么保证缓冲区的数据是正确的顺序呢?这是关键
chenzhanyiczy 发表于 2010-02-04 09:43


这里没有不同的进程发的包之间顺序性问题呀。也不要求有顺序。

TCP能保证数据流的顺序性。就是:你用send写26个字节ABCDEF.....YZ,接收端如果能得到数据,一定不会乱序的。
否则就不是STREAM SOCKET了。

这个补丁使得,进程一次send的数据(比如大小小于1K)一定是作为一个整体发送的。不需要应用加锁即可。

论坛徽章:
0
7 [报告]
发表于 2010-02-04 11:40 |只看该作者
现在这个看起来好累,你能不能把合并后的代码(copy 数据那个函数)贴一份出来呢,应该是发现缓冲区不够就进入memory wait而一个字节都不能先copy进去才能保证原子,,

论坛徽章:
0
8 [报告]
发表于 2010-02-04 11:50 |只看该作者
补丁后的tcp_sendmsg代码

  1. int tcp_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
  2.                 size_t size)
  3. {
  4.         struct sock *sk = sock->sk;
  5.         struct iovec *iov;
  6.         struct tcp_sock *tp = tcp_sk(sk);
  7.         struct sk_buff *skb;
  8.         int iovlen, flags;
  9.         int mss_now, size_goal;
  10.         int err, copied;
  11.         long timeo;
  12.         int atomic;     /* is atomic write? johnye. Feb 2, 2010 */

  13.         lock_sock(sk);
  14.         TCP_CHECK_TIMER(sk);

  15.         flags = msg->msg_flags;
  16.         timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

  17.         /* Wait for a connection to finish. */
  18.         if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
  19.                 if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)
  20.                         goto out_err;

  21.         /* This should be in poll */
  22.         clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);

  23.         mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));
  24.         size_goal = tp->xmit_size_goal;

  25.         /* Ok commence sending. */
  26.         iovlen = msg->msg_iovlen;
  27.         iov = msg->msg_iov;
  28.         copied = 0;

  29.         err = -EPIPE;
  30.         if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
  31.                 goto do_error;


  32.         /* for multi-seg data or too big chunk, no atomic. johnye. */
  33.         atomic = tp->atomic_size;
  34.         if(iovlen > 1 || iov->iov_len > atomic) atomic = 0;

  35.         while (--iovlen >= 0) {
  36.                 int seglen = iov->iov_len;
  37.                 unsigned char __user *from = iov->iov_base;

  38.                 iov++;

  39.                 while (seglen > 0) {
  40.                         int copy;

  41.                         skb = tcp_write_queue_tail(sk);

  42.                         if (!tcp_send_head(sk) ||
  43.                             (copy = size_goal - skb->len) <= 0) {

  44. new_segment:
  45.                                 /* Allocate new segment. If the interface is SG,
  46.                                  * allocate skb fitting to single page.
  47.                                  */
  48.                                 if (!sk_stream_memory_free(sk))
  49.                                         goto wait_for_sndbuf;

  50.                                 skb = sk_stream_alloc_skb(sk, select_size(sk),
  51.                                                 sk->sk_allocation);
  52.                                 if (!skb)
  53.                                         goto wait_for_memory;

  54.                                 /*
  55.                                  * Check whether we can use HW checksum.
  56.                                  */
  57.                                 if (sk->sk_route_caps & NETIF_F_ALL_CSUM)
  58.                                         skb->ip_summed = CHECKSUM_PARTIAL;

  59.                                 skb_entail(sk, skb);
  60.                                 copy = size_goal;
  61.                         }

  62.                         /* Try to append data to the end of skb. */
  63.                         if (copy > seglen)
  64.                                 copy = seglen;

  65.                         /* if atomic write. johnye */
  66.                         if (atomic)
  67.                                 copy = seglen;

  68.                         /* Where to copy to? */
  69.                         if (skb_tailroom(skb) > 0) {
  70.                                 /* We have some space in skb head. Superb! */
  71.                                 /* consider atomic write, johnye */
  72.                                 if (copy > skb_tailroom(skb)) {
  73.                                         if(atomic)
  74.                                             goto skb_page_start;        /* q mark yet, johnye */

  75.                                         copy = skb_tailroom(skb);
  76.                                 }
  77.                                 if ((err = skb_add_data(skb, from, copy)) != 0)
  78.                                         goto do_fault;

  79.                                 goto skb_page_done;
  80.                         //} else {
  81.                         }
  82.                         skb_page_start:
  83.                         {
  84.                                 int merge = 0;
  85.                                 int i = skb_shinfo(skb)->nr_frags;
  86.                                 struct page *page = TCP_PAGE(sk);
  87.                                 int off = TCP_OFF(sk);

  88.                                 if (skb_can_coalesce(skb, i, page, off) &&
  89.                                     off != PAGE_SIZE) {
  90.                                         /* We can extend the last page
  91.                                          * fragment. */
  92.                                         merge = 1;
  93.                                 } else if (i == MAX_SKB_FRAGS ||
  94.                                            (!i &&
  95.                                            !(sk->sk_route_caps & NETIF_F_SG))) {
  96.                                         /* Need to add new fragment and cannot
  97.                                          * do this because interface is non-SG,
  98.                                          * or because all the page slots are
  99.                                          * busy. */
  100.                                         tcp_mark_push(tp, skb);
  101.                                         goto new_segment;
  102.                                 } else if (page) {
  103.                                         if (off == PAGE_SIZE) {
  104.                                                 put_page(page);
  105.                                                 TCP_PAGE(sk) = page = NULL;
  106.                                                 off = 0;
  107.                                         }
  108.                                 } else
  109.                                         off = 0;

  110.                                 /* consider atomic write, johnye */
  111.                                 if (copy > PAGE_SIZE - off) {
  112.                                         if (atomic && page) {
  113.                                                 put_page(page);
  114.                                                 TCP_PAGE(sk) = page = NULL;
  115.                                                 off = 0;
  116.                                                 merge = 0;
  117.                                         } else {
  118.                                                 copy = PAGE_SIZE - off;
  119.                                         }
  120.                                 }

  121.                                 if (!sk_wmem_schedule(sk, copy))
  122.                                         goto wait_for_memory;

  123.                                 if (!page) {
  124.                                         /* Allocate new cache page. */
  125.                                         if (!(page = sk_stream_alloc_page(sk)))
  126.                                                 goto wait_for_memory;
  127.                                 }

  128.                                 /* Time to copy data. We are close to
  129.                                  * the end! */
  130.                                 err = skb_copy_to_page(sk, from, skb, page,
  131.                                                        off, copy);
  132.                                 if (err) {
  133.                                         /* If this page was new, give it to the
  134.                                          * socket so it does not get leaked.
  135.                                          */
  136.                                         if (!TCP_PAGE(sk)) {
  137.                                                 TCP_PAGE(sk) = page;
  138.                                                 TCP_OFF(sk) = 0;
  139.                                         }
  140.                                         goto do_error;
  141.                                 }

  142.                                 /* Update the skb. */
  143.                                 if (merge) {
  144.                                         skb_shinfo(skb)->frags[i - 1].size +=
  145.                                                                         copy;
  146.                                 } else {
  147.                                         skb_fill_page_desc(skb, i, page, off, copy);
  148.                                         if (TCP_PAGE(sk)) {
  149.                                                 get_page(page);
  150.                                         } else if (off + copy < PAGE_SIZE) {
  151.                                                 get_page(page);
  152.                                                 TCP_PAGE(sk) = page;
  153.                                         }
  154.                                 }

  155.                                 TCP_OFF(sk) = off + copy;
  156.                         }
  157.                         skb_page_done:

  158.                         if (!copied)
  159.                                 TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;

  160.                         tp->write_seq += copy;
  161.                         TCP_SKB_CB(skb)->end_seq += copy;
  162.                         skb_shinfo(skb)->gso_segs = 0;

  163.                         from += copy;
  164.                         copied += copy;
  165.                         if ((seglen -= copy) == 0 && iovlen == 0)
  166.                                 goto out;

  167.                         if (skb->len < size_goal || (flags & MSG_OOB))
  168.                                 continue;

  169.                         if (forced_push(tp)) {
  170.                                 tcp_mark_push(tp, skb);
  171.                                 __tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
  172.                         } else if (skb == tcp_send_head(sk))
  173.                                 tcp_push_one(sk, mss_now);
  174.                         continue;

  175. wait_for_sndbuf:
  176.                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  177. wait_for_memory:
  178.                         if (copied)
  179.                                 tcp_push(sk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);

  180.                         if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
  181.                                 goto do_error;

  182.                         mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));
  183.                         size_goal = tp->xmit_size_goal;
  184.                 }
  185.         }

  186. out:
  187.         if (copied)
  188.                 tcp_push(sk, flags, mss_now, tp->nonagle);
  189.         TCP_CHECK_TIMER(sk);
  190.         release_sock(sk);
  191.         return copied;

  192. do_fault:
  193.         if (!skb->len) {
  194.                 tcp_unlink_write_queue(skb, sk);
  195.                 /* It is the one place in all of TCP, except connection
  196.                  * reset, where we can be unlinking the send_head.
  197.                  */
  198.                 tcp_check_send_head(sk, skb);
  199.                 sk_wmem_free_skb(sk, skb);
  200.         }

  201. do_error:
  202.         if (copied)
  203.                 goto out;
  204. out_err:
  205.         err = sk_stream_error(sk, flags, err);
  206.         TCP_CHECK_TIMER(sk);
  207.         release_sock(sk);
  208.         return err;
  209. }


复制代码

论坛徽章:
1
黑曼巴
日期:2020-02-27 22:54:26
9 [报告]
发表于 2010-02-04 13:12 |只看该作者
提示: 作者被禁止或删除 内容自动屏蔽
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP