- 论坛徽章:
- 11
|
我也发一个, 刚写完
- #include "list_head.h"
- #include "fdset.h"
- #include "fdset_in.h"
- #include "mem.h"
- #include "lock.h"
- #include "now.h"
- #include "mbuf.h"
- #include "utils.h"
- #include "fdctxpool.h"
- #include <unistd.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <sys/eventfd.h>
- #include <sys/timerfd.h>
- #include <string.h>
- #define epoll_mask(x) (x | EPOLLONESHOT | EPOLLET)
- #define stat_ready 0
- #define stat_busy 1
- typedef struct tag_fdset {
- int fd, efd, tfd;
- int threads;
- int fdctx_lck, timer_lck;
- struct list_head fdctx_head;
- struct list_head timer_head;
- } fdset;
- typedef struct tag_timer_notify {
- uint32_t id;
- uint32_t tms;
- fd_contex* fdctx;
- struct list_head fdset_entry;
- struct list_head fdctx_entry;
- } timer_notify;
- struct my_buffer* buffer_reserve(fd_contex* fdctx, uint32_t bytes)
- {
- struct my_buffer* mbuf = NULL;
- if (fdctx->read_mbuf != NULL) {
- mbuf = fdctx->read_mbuf;
- fdctx->read_mbuf = NULL;
- } else {
- mbuf = mbuf_alloc_2(bytes);
- }
- return mbuf;
- }
- static int io_read(fd_contex* fdctx)
- {
- int n, error = 0;
- struct fd_struct* fds = fdctx->fds;
- struct my_buffer* mbuf = NULL;
- if (fds->bytes > 0) {
- LABEL:
- mbuf = buffer_reserve(fdctx, fds->bytes);
- if (mbuf != NULL) {
- if (fds->tcp0_udp1 == 0) {
- n = do_tcp_read(fds->fd, mbuf);
- if (n == -1) {
- error = errno;
- }
- } else {
- n = do_udp_read(fds->fd, mbuf);
- if (n == -1) {
- error = EAGAIN;
- }
- }
- } else {
- error = ENOMEM;
- }
- }
- if (error != EWOULDBLOCK && error != EAGAIN) {
- n = fds->read(fds, mbuf, error);
- if (n > 0) {
- fds->bytes = n;
- goto LABEL;
- }
- } else {
- fds->bytes = mbuf->length;
- fdctx->read_mbuf = mbuf;
- n = 0;
- }
- return n;
- }
- static int io_write(fd_contex* fdctx)
- {
- int n;
- struct sockaddr_in* in;
- fdctx->fds->mask &= (~EPOLLOUT);
- struct my_buffer* mbuf = NULL;
- while (1) {
- if (fdctx->write_mbuf != NULL) {
- mbuf = fdctx->write_mbuf;
- fdctx->write_mbuf = NULL;
- } else {
- mbuf = fdctx->fds->write(fdctx->fds, &in);
- }
- if (mbuf == NULL) {
- break;
- }
- if (fdctx->fds->tcp0_udp1 == 0) {
- n = do_tcp_write(fdctx->fds->fd, mbuf);
- } else {
- n = do_udp_write(fdctx->fds->fd, mbuf, in);
- }
- if (mbuf->length != 0) {
- fdctx->write_mbuf = mbuf;
- fdctx->fds->mask |= EPOLLOUT;
- break;
- }
- mbuf->mop->free(mbuf);
- }
- return n;
- }
- static int io_proc(fd_contex* fdctx, uint32_t mask)
- {
- int n = 0;
- struct fd_struct* fds = fdctx->fds;
- if (mask & (EPOLLERR | EPOLLHUP)) {
- fds->read(fds, NULL, EBADF);
- return -1;
- }
- if (mask & (EPOLLIN | EPOLLPRI)) {
- n = io_read(fdctx);
- if (n == -1) {
- return -1;
- }
- }
- if (mask & EPOLLRDHUP) {
- n = fds->read(fds, NULL, ESHUTDOWN);
- if (n == -1) {
- return -1;
- }
- }
- if (mask & EPOLLOUT) {
- n = io_write(fdctx);
- }
- fdctx->ev.events = epoll_mask(fdctx->fds->mask);
- if (n == -1) {
- n = fds->read(fds, NULL, errno);
- }
- return n;
- }
- static void timer_expires(fdset* fset)
- {
- fd_contex* fdctx;
- uint32_t tms = now();
- LIST_HEAD(head);
- struct list_head* ent;
- lock(&fset->timer_lck);
- for (ent = fdctx->timer_head.next; ent != &fdctx->timer_head; ent = ent->next) {
- timer_notify* notify = list_entry(ent, timer_notify, fdset_entry);
- if (notify->tms > tms) {
- break;
- }
- fdctx = notify->fdctx;
- handle_clone(fdctx->self);
- list_del_init(¬ify->fdset_entry);
- list_del(¬ify->fdctx_entry);
- list_add(¬ify->fdctx_entry, &head);
- }
- unlock(&fset->timer_lck);
- while (!list_empty(&head)) {
- timer_notify* notify = list_entry(head.next, timer_notify, fdctx_entry);
- list_del(head.next);
- fdctx = notify->fdctx;
- fdctx->fds->notify(fdctx->fds, notify->id);
- handle_release(fdctx->self);
- my_free(notify);
- }
- }
- static void looper(void* any, int fd, int efd)
- {
- struct epoll_event events;
- my_handle* handle = (my_handle *) any;
- while (1) {
- int nr = epoll_wait(fd, &events, 1, -1);
- fdset* fset = (fdset *) handle_get(handle);
- if (__builtin_expect(fset == NULL, 0)) {
- break;
- }
- if (__builtin_expect(nr == 1, 1)) {
- fd_contex* fdctx = (fd_contex *) events.data.ptr;
- if (__builtin_expect(fdctx != NULL, 1)) {
- lock(&fdctx->busy_lck);
- int stat = __sync_val_compare_and_swap(&fdctx->stat, stat_ready, stat_busy);
- unlock(&fdctx->busy_lck);
- if (stat == stat_ready) {
- handle_get(fdctx->self);
- while (fdctx->busy);
- fdctx->ev.events = 0;
- nr = io_proc(fdctx, events.events);
- if (nr != -1) {
- epoll_ctl(fd, EPOLL_CTL_MOD, fdctx->fds->fd, &fdctx->ev);
- }
- fdctx->stat = stat_ready;
- handle_put(fdctx->self);
- }
- } else {
- timer_expires(fset);
- }
- } else if (nr != -1 || EINTR != errno) {
- printf("what's up? n = %d, errno = %d\n", nr, errno);
- }
- handle_put(handle);
- }
- int n = handle_release(handle);
- if (n == 0) {
- epoll_ctl(fd, EPOLL_CTL_DEL, efd, NULL);
- close(efd);
- close(fd);
- }
- }
- static void fset_delete(void* addr)
- {
- fdset* fset = (fdset *) addr;
- epoll_ctl(fset->fd, EPOLL_CTL_DEL, fset->tfd, NULL);
- close(fset->tfd);
- while (!list_empty(&fset->timer_head)) {
- struct list_head* ent = fset->timer_head.next;
- list_del_init(ent);
- }
- while (!list_empty(&fset->fdctx_head)) {
- struct list_head* ent = fset->fdctx_head.next;
- list_del(ent);
- fd_contex* fdctx = list_entry(ent, fd_contex, set_entry);
- epoll_ctl(fset->fd, EPOLL_CTL_DEL, fdctx->fds->fd, NULL);
- handle_dettach(fdctx->self);
- }
- if (fset->threads != 0) {
- uint64_t n = 1;
- write(fset->efd, &n, sizeof(n));
- } else {
- epoll_ctl(fset->fd, EPOLL_CTL_DEL, fset->efd, NULL);
- close(fset->efd);
- close(fset->fd);
- }
- my_free(fset);
- }
- int fdset_join(void* set)
- {
- my_handle* hset = (my_handle *) set;
- fdset* fset = (fdset *) handle_get(hset);
- if (fset == NULL) {
- errno = -EBADF;
- return -1;
- }
- int fd = fset->fd;
- int efd = fset->efd;
- __sync_add_and_fetch(&fset->threads, 1);
- handle_clone(hset);
- handle_put(hset);
- looper(hset, fd, efd);
- return 0;
- }
- void* fdset_new(uint32_t size)
- {
- struct epoll_event event;
- int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
- if (tfd == -1) {
- return NULL;
- }
- struct itimerspec timerspec = {
- .it_interval = {0, 0},
- .it_value = {0, 0}
- };
- int n = timerfd_settime(tfd, 0, &timerspec, NULL);
- if (n == -1) {
- goto LABEL5;
- }
- int fd = epoll_create(size);
- if (fd == -1) {
- goto LABEL5;
- }
- event.events = epoll_mask(EPOLLIN);
- event.data.ptr = NULL;
- n = epoll_ctl(fd, EPOLL_CTL_ADD, tfd, &event);
- if (n == -1) {
- goto LABEL4;
- }
- int efd = eventfd(0, 0);
- if (efd == -1) {
- goto LABEL3;
- }
- event.data.fd = fd;
- event.events = EPOLLIN;
- n = epoll_ctl(fd, EPOLL_CTL_ADD, efd, &event);
- if (n == -1) {
- goto LABEL2;
- }
- errno = -ENOMEM;
- fdset* fset = (fdset *) my_malloc(sizeof(fdset));
- if (fset == NULL) {
- goto LABEL1;
- }
- my_handle* handle = handle_attach(fset, fset_delete);
- if (handle == NULL) {
- goto LABEL0;
- }
- INIT_LIST_HEAD(&fset->fdctx_head);
- INIT_LIST_HEAD(&fset->timer_head);
- fset->fdctx_lck = 0;
- fset->timer_lck = 0;
- fset->fd = fd;
- fset->efd = efd;
- fset->threads = 0;
- fset->tfd = tfd;
- return (void *) handle;
- LABEL0:
- my_free(fset);
- LABEL1:
- epoll_ctl(fd, EPOLL_CTL_DEL, efd, NULL);
- LABEL2:
- close(efd);
- LABEL3:
- epoll_ctl(fd, EPOLL_CTL_DEL, tfd, NULL);
- LABEL4:
- close(fd);
- LABEL5:
- close(tfd);
- return NULL;
- }
- static void unlink_from_fdset(fdset* fset, fd_contex* fdctx)
- {
- lock(&fset->fdctx_lck);
- list_del(&fdctx->set_entry);
- unlock(&fset->fdctx_lck);
- struct list_head* ent = fdctx->timer_head.next;
- if (!list_empty(ent)) {
- lock(&fset->timer_lck);
- for (; ent != &fdctx->timer_head; ent = ent->next) {
- timer_notify* notify = list_entry(ent, timer_notify, fdctx_entry);
- list_del_init(¬ify->fdset_entry);
- }
- unlock(&fset->timer_lck);
- }
- struct fd_struct* fds = fdctx->fds;
- epoll_ctl(fset->fd, EPOLL_CTL_DEL, fds->fd, NULL);
- }
- static void fd_contex_free(void* addr)
- {
- fd_contex* fdctx = (fd_contex *) addr;
- while (1) {
- int stat = __sync_val_compare_and_swap(&fdctx->stat, stat_ready, stat_busy);
- if (stat == stat_ready) {
- break;
- }
- sched_yield();
- }
- int need_notify = 1;
- fdset* fset = (fdset *) handle_get(fdctx->hset);
- if (fset != NULL) {
- if (!list_empty(&fdctx->set_entry)) {
- unlink_from_fdset(fset, fdctx);
- } else {
- need_notify = 0;
- }
- handle_release(fdctx->self);
- handle_put(fdctx->hset);
- }
- handle_release(fdctx->hset);
- while (!list_empty(&fdctx->timer_head)) {
- struct list_head* ent = fdctx->timer_head.next;
- list_del(ent);
- timer_notify* notify = list_entry(ent, timer_notify, fdctx_entry);
- my_free(notify);
- }
- if (need_notify == 1) {
- struct fd_struct* fds = fdctx->fds;
- fds->detach(fds);
- }
- if (fdctx->read_mbuf != NULL) {
- fdctx->read_mbuf->mop->free(fdctx->read_mbuf);
- }
- if (fdctx->write_mbuf != NULL) {
- fdctx->write_mbuf->mop->free(fdctx->write_mbuf);
- }
- fd_contex_put(fdctx);
- }
- my_handle* fdset_attach_fd(void* set, struct fd_struct* fds)
- {
- if (fds->fd == -1) {
- errno = -EINVAL;
- return NULL;
- }
- if (-1 == make_none_block(fds->fd)) {
- return NULL;
- }
- my_handle* hset = (my_handle *) set;
- fdset* fset = (fdset *) handle_get(hset);
- if (fset == NULL) {
- errno = -EBADF;
- return NULL;
- }
- errno = -ENOMEM;
- fd_contex* fdctx = fd_contex_get();
- if (fdctx == NULL) {
- handle_put(hset);
- return NULL;
- }
- my_handle* handle = handle_attach(fdctx, fd_contex_free);
- if (handle == NULL) {
- fd_contex_put(fdctx);
- handle_put(hset);
- return NULL;
- }
- handle_clone(handle);
- fdctx->self = handle;
- handle_clone(hset);
- fdctx->hset = hset;
- fdctx->fds = fds;
- fdctx->read_mbuf = NULL;
- fdctx->write_mbuf = NULL;
- fdctx->busy = 0;
- fdctx->busy_lck = 0;
- INIT_LIST_HEAD(&fdctx->set_entry);
- INIT_LIST_HEAD(&fdctx->timer_head);
- fdctx->ev.events = epoll_mask(fds->mask);
- fdctx->ev.data.ptr = fdctx;
- int n = epoll_ctl(fset->fd, EPOLL_CTL_ADD, fds->fd, &fdctx->ev);
- if (-1 == n) {
- handle_release(handle);
- handle_dettach(handle);
- handle = NULL;
- } else {
- lock(&fset->fdctx_lck);
- list_add(&fdctx->set_entry, &fset->fdctx_head);
- unlock(&fset->fdctx_lck);
- fdctx->stat = stat_ready;
- }
- handle_put(hset);
- return handle;
- }
- void fdset_delete(void* hset)
- {
- handle_dettach((my_handle *) hset);
- }
- int fdset_update_fdmask(my_handle* handle)
- {
- errno = -EBADF;
- fd_contex* fdctx = (fd_contex *) handle_get(handle);
- if (fdctx == NULL) {
- return -1;
- }
- fdset* fset = (fdset *) handle_get(fdctx->hset);
- if (fset == NULL) {
- handle_put(handle);
- return -1;
- }
- struct fd_struct* fds = fdctx->fds;
- lock(&fdctx->busy_lck);
- fdctx->busy = 1;
- unlock(&fdctx->busy_lck);
- uint32_t old_mask = fdctx->ev.events;
- uint32_t new_mask = epoll_mask(fds->mask);
- if (old_mask == new_mask || fdctx->stat == stat_busy) {
- fdctx->busy = 0;
- handle_put(fdctx->hset);
- handle_put(handle);
- return 0;
- }
- fdctx->ev.events = new_mask;
- int n = epoll_ctl(fset->fd, EPOLL_CTL_MOD, fds->fd, &fdctx->ev);
- if (n == -1) {
- fdctx->ev.events = old_mask;
- }
- fdctx->busy = 0;
- handle_put(fdctx->hset);
- handle_put(handle);
- return n;
- }
- static int fdset_add_timer(fdset* fset, timer_notify* notify, uint32_t ms)
- {
- struct list_head* ent;
- lock(&fset->timer_lck);
- for (ent = fset->timer_head.prev; ent != &fset->timer_head; ent = ent->prev) {
- timer_notify* cur = list_entry(ent, timer_notify, fdset_entry);
- if (cur->tms <= notify->tms) {
- break;
- }
- }
- int n = 0;
- if (ent == &fset->timer_head) {
- struct itimerspec timerspec;
- timerspec.it_value.tv_sec = ms / 1000;
- timerspec.it_value.tv_nsec = (ms - timerspec.it_value.tv_sec * 1000) * 1000000;
- timerspec.it_interval.tv_sec = 0;
- timerspec.it_interval.tv_nsec = 0;
- n = timerfd_settime(fset->tfd, 0, &timerspec, NULL);
- }
- if (n == 0) {
- list_add(¬ify->fdset_entry, ent);
- }
- unlock(&fset->timer_lck);
- return n;
- }
- int fdset_sched(my_handle* handle, uint32_t ms, uint32_t id)
- {
- fd_contex* fdctx = (fd_contex *) handle_get(handle);
- if (fdctx == NULL) {
- errno = -EBADF;
- return -1;
- }
- fdset* fset = (fdset *) handle_get(fdctx->hset);
- assert(fset != NULL);
- int n = -1;
- errno = -ENOMEM;
- timer_notify* notify = (timer_notify *) my_malloc(sizeof(timer_notify));
- if (notify != NULL) {
- notify->id = id;
- notify->tms = now() + ms;
- notify->fdctx = fdctx;
- list_add(¬ify->fdctx_entry, &fdctx->timer_head);
- n = fdset_add_timer(fset, notify, ms);
- if (n == -1) {
- my_free(notify);
- } else {
- handle_clone(handle);
- }
- }
- handle_put(fdctx->hset);
- handle_put(handle);
- return n;
- }
- struct fd_struct* fd_struct_from(void* any)
- {
- if (any == NULL) {
- return NULL;
- }
- fd_contex* fdctx = (fd_contex *) any;
- struct fd_struct* fds = fdctx->fds;
- return fds;
- }
复制代码 |
|