diff --git a/anet.go b/anet.go index 4ef4a63..2d90969 100644 --- a/anet.go +++ b/anet.go @@ -4,7 +4,7 @@ import ( "net" "sync" "context" - "runtime" + // "runtime" ) func CreateListener(network, addr string) (net.Listener, error) { @@ -37,8 +37,8 @@ type OnConnect func(ctx context.Context, connection Connection) context.Context func init() { log = newDefaultLogger() - n := (runtime.GOMAXPROCS(0) - 1) / 20 + 1 - ringmanager = newDefaultRingManager(n) + // n := (runtime.GOMAXPROCS(0) - 1) / 20 + 1 + RingManager = newDefaultRingManager(4) } type eventLoop struct { @@ -114,7 +114,7 @@ func (evl *eventLoop) ShutdownWithContext(ctx context.Context) error { } err := svr.close(ctx) if err != nil { - log.Error("[EventLoop] error occured while closing") + log.Error("[EventLoop] error occurred while closing") } return err } diff --git a/anet_server.go b/anet_server.go index 24f0957..c036178 100644 --- a/anet_server.go +++ b/anet_server.go @@ -59,7 +59,7 @@ func (s *server) onAccept(conn net.Conn) { func (s *server) close(ctx context.Context) error { err := s.ln.Close() if err != nil { - log.Error("error occured while close listener") + log.Error("error occurred while close listener") } ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -71,7 +71,7 @@ func (s *server) close(ctx context.Context) error { connection := value.(*connection) err := connection.Close() if err != nil { - log.Error("error occured while close connection") + log.Error("error occurred while close connection") } return true }) diff --git a/connection_impl.go b/connection_impl.go index 390152b..f8bb087 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -148,8 +148,8 @@ func (c *connection) init(conn net.Conn, opts *options) { c.writeTrigger = make(chan error) c.readLoopTrigger = make(chan error) c.writeLoopTrigger = make(chan error) - c.inputBuffer = newBytesBuffer(1024) - c.outputBuffer = newBytesBuffer(1024) + c.inputBuffer = newBytesBuffer(4096) + c.outputBuffer = newBytesBuffer(4096) c.state = 0 c.id = uuid.New().String()[:8] c.initNetFD(conn) @@ -166,18 +166,18 @@ func (c *connection) initNetFD(conn net.Conn) { } func (c *connection) initFDOperator() { - ring := ringmanager.pick() + ring := RingManager.Pick() op := ring.Alloc() op.FD = c.fd op.OnRead = c.onRead op.OnWrite = c.onWrite - op.ring = ring + op.Ring = ring c.operator = op } func (c *connection) initFinalizer() { c.AddCloseCallback(func(connection Connection) error { - c.operator.free() + c.operator.Free() return nil }) } diff --git a/connection_reactor.go b/connection_reactor.go index c3b632a..f047c93 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -8,31 +8,23 @@ const ( defaultReadSize = 1024 ) -func (c *connection) onRead(n int) error { +func (c *connection) onRead(n int, err error) { log.Infof("[conn %s] has read %d bytes of new data", c.id, n) - err := c.inputBuffer.BookAck(n) - if err != nil { - return err - } + c.inputBuffer.BookAck(n) processed := c.onRequest() if !processed && n >= int(atomic.LoadInt32(&c.waitReadSize)) { // there is a goroutine waiting for enough data c.triggerRead(nil) } c.triggerReadLoop(nil) - return nil } -func (c *connection) onWrite(n int) error { +func (c *connection) onWrite(n int, err error) { log.Infof("[conn %s] has write %d bytes of new data", c.id, n) - err := c.outputBuffer.SeekAck(n) - if err != nil { - return err - } + c.outputBuffer.SeekAck(n) c.unlock(flushing) c.triggerWrite(nil) c.triggerWriteLoop(nil) - return nil } func (c *connection) onHup() error { @@ -83,10 +75,11 @@ func (c *connection) onClose() error { func (c *connection) readLoop() { for { log.Infof("[conn %s] sumbit prep read event", c.id) - eventData := PrepReadEventData{} - eventData.size = defaultReadSize - eventData.data = c.inputBuffer.Book(defaultReadSize) - c.operator.submit(RingPrepRead, eventData) + eventData := RingEventData{} + eventData.Size = defaultReadSize + eventData.Data = c.inputBuffer.Book(defaultReadSize) + eventData.Event = RingPrepRead + c.operator.Submit(eventData) err := <-c.readLoopTrigger if err != nil { log.Infof("[conn %s] read loop quit since connection closed", c.id) @@ -100,14 +93,15 @@ func (c *connection) writeLoop() { size := c.outputBuffer.Len() if size > 0 && c.lock(flushing) { log.Infof("[conn %s] sumbit prep write event", c.id) - eventData := PrepWriteEventData{} - eventData.size = size - eventData.data, _ = c.outputBuffer.Seek(size) - c.operator.submit(RingPRepWrite, eventData) + eventData := RingEventData{} + eventData.Size = size + eventData.Data, _ = c.outputBuffer.Seek(size) + eventData.Event = RingPrepWrite + c.operator.Submit(eventData) } err := <-c.writeLoopTrigger if err != nil { - log.Infof("[conn %s] read loop quit since connection closed", c.id) + log.Infof("[conn %s] write loop quit since connection closed", c.id) return } } diff --git a/examples/echo_server/main.go b/examples/echo_server/main.go index 0fb7589..b0c5023 100644 --- a/examples/echo_server/main.go +++ b/examples/echo_server/main.go @@ -39,15 +39,15 @@ func handler(ctx context.Context, connection anet.Connection) error { defer connection.Close() data, err := reader.ReadAll(); if err != nil { - fmt.Println("error occured while reading connection: {}", err.Error()) + fmt.Println("error occurred while reading connection: {}", err.Error()) } err = writer.WriteBytes(data, len(data)); if err != nil { - fmt.Println("error occured while writing connection: {}", err.Error()) + fmt.Println("error occurred while writing connection: {}", err.Error()) } err = writer.Flush() if err != nil { - fmt.Println("error occured while flushing connection: {}", err.Error()) + fmt.Println("error occurred while flushing connection: {}", err.Error()) } return nil }