Skip to content

Commit

Permalink
feat: add basic code
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 1, 2024
0 parents commit 014399a
Show file tree
Hide file tree
Showing 27 changed files with 1,857 additions and 0 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/e2e_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Go E2E Test

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
build:
name: E2E Test
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v4

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.0

- name: Install Dependencies
run: |
chmod +x ./setup.sh
./setup.sh
go mod tidy
- name: Run Tests
run: go test -v ./e2e_test/...
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.local/
.vscode/
.devcontainer/
examples/output/
120 changes: 120 additions & 0 deletions anet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package anet

import (
"net"
"sync"
"context"
"runtime"
)

func CreateListener(network, addr string) (net.Listener, error) {
return net.Listen(network, addr)
}

func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {
opts := &options{
onRequest: onRequest,
}
for _, do := range ops {
do.f(opts)
}
return &eventLoop{
opts: opts,
stop: make(chan error, 1),
}, nil
}

type EventLoop interface {
Serve(ln net.Listener) error
ServeNonBlocking(ln net.Listener) error
Shutdown() error
ShutdownWithContext(ctx context.Context) error
}

type OnRequest func(ctx context.Context, connection Connection) error

type OnConnect func(ctx context.Context, connection Connection) context.Context

func init() {
log = newDefaultLogger()
n := (runtime.GOMAXPROCS(0) - 1) / 20 + 1
ringmanager = newDefaultRingManager(n)
}

type eventLoop struct {
sync.Mutex
opts *options
svr *server
stop chan error
}

func (evl *eventLoop) Serve(ln net.Listener) error {
evl.Lock()
evl.svr = newServer(ln, evl.opts, func(err error) {
evl.Lock()
if evl.svr == nil {
evl.Unlock()
return
}
evl.svr = nil
evl.Unlock()
select {
case evl.stop <- err:
default:
}
})
go evl.svr.run()
evl.Unlock()

err := <-evl.stop
if err != nil {
log.Error("[EventLoop] error eccoured while serving")
}
return err
}

func (evl *eventLoop) ServeNonBlocking(ln net.Listener) error {
evl.Lock()
evl.svr = newServer(ln, evl.opts, func(err error) {
evl.Lock()
if evl.svr == nil {
evl.Unlock()
return
}
evl.svr = nil
evl.Unlock()
select {
case evl.stop <- err:
default:
}
})
go evl.svr.run()
evl.Unlock()
return nil
}

func (evl *eventLoop) Shutdown() error {
return evl.ShutdownWithContext(context.Background())
}

func (evl *eventLoop) ShutdownWithContext(ctx context.Context) error {
evl.Lock()
if evl.svr == nil {
evl.Unlock()
return nil
}
svr := evl.svr
evl.svr = nil
evl.Unlock()

log.Info("[EventLoop] closed by user")
select {
case evl.stop <- nil:
default:
}
err := svr.close(ctx)
if err != nil {
log.Error("[EventLoop] error occured while closing")
}
return err
}
50 changes: 50 additions & 0 deletions anet_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package anet

import (
"time"

"github.com/sirupsen/logrus"
)

func SetLogger(logger *logrus.Logger) {
setLogger(logger)
}

func SetLoggerLevel(level logrus.Level) {
setLoggerLevel(level)
}

func WithOnConnect(onConnect OnConnect) Option {
return Option{
f: func(op *options) {
op.onConnect = onConnect
},
}
}

func WithReadTimeout(timeout time.Duration) Option {
return Option{
f: func(op *options) {
op.readTimeout = timeout
},
}
}

func WithWriteTimeout(timeout time.Duration) Option {
return Option{
f: func(op *options) {
op.writeTimeout = timeout
},
}
}

type Option struct {
f func(*options)
}

type options struct {
onConnect OnConnect
onRequest OnRequest
readTimeout time.Duration
writeTimeout time.Duration
}
90 changes: 90 additions & 0 deletions anet_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package anet

import (
"net"
"sync"
"time"
"context"
"strings"
)

func newServer(ln net.Listener, opts *options, onQuit func(err error)) *server {
return &server{
ln: ln,
opts: opts,
onQuit: onQuit,
}
}

type server struct {
ln net.Listener
opts *options
connections sync.Map
onQuit func(err error)
}

func (s *server) run() {
for {
log.Info("waiting for connection accept")
conn, err := s.ln.Accept()
if err != nil {
if strings.Contains(err.Error(), "closed") {
log.Error("connection accepted with closed, server quit")
s.onQuit(err)
return
}
log.Warn("onnection accepted with error, wait 10 ms for retry")
time.Sleep(10 * time.Millisecond)
continue
}
log.Info("connection accepted")
go s.onAccept(conn)
}
}

func (s *server) onAccept(conn net.Conn) {
fd, err := conn.(*net.TCPConn).File()
if err != nil {
log.Error("can't get connection's fd")
return
}
connection := &connection{}
connection.init(conn, s.opts)
connection.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd.Fd())
return nil
})
s.connections.Store(fd.Fd(), connection)
connection.onPrepare()
}

func (s *server) close(ctx context.Context) error {
err := s.ln.Close()
if err != nil {
log.Error("error occured while close listener")
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var hasConn bool
for {
hasConn = false
s.connections.Range(func(key, value interface{}) bool {
hasConn = true
connection := value.(*connection)
err := connection.Close()
if err != nil {
log.Error("error occured while close connection")
}
return true
})
if !hasConn {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
continue
}
}
}
24 changes: 24 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package anet

type Reader interface {
Seek(n int) ([]byte, error)
SeekAck(n int) error
ReadAll() ([]byte, error)
ReadUtil(n int) ([]byte, error)
ReadBytes(n int) ([]byte, error)
ReadString(n int) (string, error)
Len() int
}

type Writer interface {
Book(n int) []byte
BookAck(n int) error
WriteBytes(data []byte, n int) error
WriteString(data string, n int) error
Flush() error
}

type ReadWriter interface {
Reader
Writer
}
Loading

0 comments on commit 014399a

Please sign in to comment.