基于 epoll 系统调用实现非阻塞的echo server:
这部分代码是 Go BIO/NIO探讨(7):IO多路复用之epoll 的附录
package main
import (
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
)
func ipToSockaddrInet4(ip net.IP, port int) (syscall.SockaddrInet4, error) {
if len(ip) == 0 {
ip = net.IPv4zero
}
ip4 := ip.To4()
if ip4 == nil {
return syscall.SockaddrInet4{}, &net.AddrError{Err: "non-IPv4 address", Addr: ip.String()}
}
sa := syscall.SockaddrInet4{Port: port}
copy(sa.Addr[:], ip4)
return sa, nil
}
func isError(ev uint32) bool {
return (ev&uint32(syscall.EPOLLERR)) > 0 || (ev&uint32(syscall.EPOLLHUP)) > 0 || (ev&syscall.EPOLLIN) == 0
}
func main() {
var (
family = syscall.AF_INET
sotype = syscall.SOCK_STREAM
_ = "tcp"
listenBacklog = syscall.SOMAXCONN
serverip = net.IPv4(0, 0, 0, 0)
serverport = 8080
)
// 创建套接字
sockfd, err := syscall.Socket(family, sotype, 0)
if err != nil {
panic(fmt.Errorf("fails to create socket: %s", err))
}
syscall.CloseOnExec(sockfd)
// epoll edge-triggered 模式支持nonblock
if err := syscall.SetNonblock(sockfd, true); err != nil {
syscall.Close(sockfd)
panic(fmt.Errorf("setnonblock error=%v", err))
}
// 接收到Ctrl+C信号后,关闭socket
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
log.Println("r- Ctrl+C pressed in Terminal")
if err := syscall.Close(sockfd); err != nil {
log.Printf("Close sockfd %d fails, err=%v
", sockfd, err)
} else {
log.Printf("Server stopped successfully!!!")
}
// 收到信号后需要处理, 否则程序会永久hang住, 需要kill -9
// os.Exit 会导致所有goroutine都会立即停止执行
os.Exit(0)
}()
addr, err := ipToSockaddrInet4(serverip, serverport)
if err != nil {
panic(fmt.Sprintf("fails to convert address %s:%d to socket addr, err=%s", serverip, serverport, err))
}
if err := syscall.Bind(sockfd, &addr); err != nil {
panic(fmt.Sprintf("fails to bind socket %d to address %s:%d, err=%s",
sockfd,
serverip, serverport,
err))
}
if err := syscall.Listen(sockfd, listenBacklog); err != nil {
log.Printf("listen sockfd %d to addr error=%v
", sockfd, err)
panic(fmt.Sprintf("fails to listen socket %d", sockfd))
} else {
log.Printf("Started listening on %s:%d", serverip, serverport)
}
epfd, err := syscall.EpollCreate1(0)
if err != nil {
panic(fmt.Sprintf("create epoll instance fails, err=%s", err))
}
// 默认是 level-triggered,效率更高的poll
epEvent := syscall.EpollEvent{
Fd: int32(sockfd),
Events: uint32(syscall.EPOLLIN) | uint32(-syscall.EPOLLET),
}
if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, sockfd, &epEvent); err != nil {
panic(fmt.Errorf("epoll_ctl %v fails, err=%s", epfd, err))
}
events := make([]syscall.EpollEvent, 128, 128)
var buf [32 * 1024]byte
for {
// msec < 0, EpollWait 会被阻塞直到有一个 fd 可用
nReady, err := syscall.EpollWait(epfd, events, -1)
if err != nil {
log.Printf("epoll_wait error=%v
", err)
panic(fmt.Errorf("epoll_wait error=%v", err))
}
for i := 0; i < nReady; i++ {
ev := &events[i]
if isError(ev.Events) {
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
log.Printf("epoll error: %s
", err)
// 取消监听
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(events[i].Fd))
continue
}
if ev.Fd == int32(sockfd) {
for {
// 监听套接字(server端套接字
clientfd, _, err := syscall.Accept(sockfd)
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
// 所有新创建的tcp conn均已被处理
break
}
// 设置为nonblock
if err := syscall.SetNonblock(clientfd, true); err != nil {
log.Printf("fails to set client socket %v as nonblock, err=%s
", clientfd, err)
continue
}
epEvent.Fd = int32(clientfd)
epEvent.Events = uint32(syscall.EPOLLIN) | uint32(-syscall.EPOLLET)
if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, clientfd, &epEvent); err != nil {
log.Printf("register client socket %v fails, err=%s
", clientfd, err)
syscall.Close(clientfd)
continue
}
}
} else {
// 已连接套接字 tcp conn
for {
nRead, err := syscall.Read(int(ev.Fd), buf[:])
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
// 数据已经读完了
break
} else if err != nil {
log.Printf("fails to read data from sockfd %d, err=%v
", ev.Fd, err)
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(ev.Fd))
break
} else if nRead == 0 { // EOF
// Client closed
log.Printf("client sock %d closed
", ev.Fd)
_ = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int(ev.Fd), ev)
_ = syscall.Close(int(ev.Fd))
break
} else {
log.Printf("read %d bytes from sock %d
", nRead, ev.Fd)
if _, err := syscall.Write(int(ev.Fd), buf[:nRead]); err != nil {
log.Printf("fails to write data %s into sockfd %d, err=%v
", buf[:nRead], sockfd, err)
}
}
}
}
}
}
}
页面更新:2024-03-12
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号