Skip to content

Commit

Permalink
fix: resolve build error
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 8, 2024
1 parent 744bc39 commit 5be301a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 35 deletions.
8 changes: 4 additions & 4 deletions anet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net"
"sync"
"context"
"runtime"
// "runtime"
)

func CreateListener(network, addr string) (net.Listener, error) {
Expand Down Expand Up @@ -37,8 +37,8 @@ type OnConnect func(ctx context.Context, connection Connection) context.Context

func init() {
log = newDefaultLogger()
n := (runtime.GOMAXPROCS(0) - 1) / 20 + 1
ringmanager = newDefaultRingManager(n)
// n := (runtime.GOMAXPROCS(0) - 1) / 20 + 1
RingManager = newDefaultRingManager(4)
}

type eventLoop struct {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (evl *eventLoop) ShutdownWithContext(ctx context.Context) error {
}
err := svr.close(ctx)
if err != nil {
log.Error("[EventLoop] error occured while closing")
log.Error("[EventLoop] error occurred while closing")
}
return err
}
4 changes: 2 additions & 2 deletions anet_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *server) onAccept(conn net.Conn) {
func (s *server) close(ctx context.Context) error {
err := s.ln.Close()
if err != nil {
log.Error("error occured while close listener")
log.Error("error occurred while close listener")
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand All @@ -71,7 +71,7 @@ func (s *server) close(ctx context.Context) error {
connection := value.(*connection)
err := connection.Close()
if err != nil {
log.Error("error occured while close connection")
log.Error("error occurred while close connection")
}
return true
})
Expand Down
10 changes: 5 additions & 5 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (c *connection) init(conn net.Conn, opts *options) {
c.writeTrigger = make(chan error)
c.readLoopTrigger = make(chan error)
c.writeLoopTrigger = make(chan error)
c.inputBuffer = newBytesBuffer(1024)
c.outputBuffer = newBytesBuffer(1024)
c.inputBuffer = newBytesBuffer(4096)
c.outputBuffer = newBytesBuffer(4096)
c.state = 0
c.id = uuid.New().String()[:8]
c.initNetFD(conn)
Expand All @@ -166,18 +166,18 @@ func (c *connection) initNetFD(conn net.Conn) {
}

func (c *connection) initFDOperator() {
ring := ringmanager.pick()
ring := RingManager.Pick()
op := ring.Alloc()
op.FD = c.fd
op.OnRead = c.onRead
op.OnWrite = c.onWrite
op.ring = ring
op.Ring = ring
c.operator = op
}

func (c *connection) initFinalizer() {
c.AddCloseCallback(func(connection Connection) error {
c.operator.free()
c.operator.Free()
return nil
})
}
Expand Down
36 changes: 15 additions & 21 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,23 @@ const (
defaultReadSize = 1024
)

func (c *connection) onRead(n int) error {
func (c *connection) onRead(n int, err error) {
log.Infof("[conn %s] has read %d bytes of new data", c.id, n)
err := c.inputBuffer.BookAck(n)
if err != nil {
return err
}
c.inputBuffer.BookAck(n)
processed := c.onRequest()
if !processed && n >= int(atomic.LoadInt32(&c.waitReadSize)) {
// there is a goroutine waiting for enough data
c.triggerRead(nil)
}
c.triggerReadLoop(nil)
return nil
}

func (c *connection) onWrite(n int) error {
func (c *connection) onWrite(n int, err error) {
log.Infof("[conn %s] has write %d bytes of new data", c.id, n)
err := c.outputBuffer.SeekAck(n)
if err != nil {
return err
}
c.outputBuffer.SeekAck(n)
c.unlock(flushing)
c.triggerWrite(nil)
c.triggerWriteLoop(nil)
return nil
}

func (c *connection) onHup() error {
Expand Down Expand Up @@ -83,10 +75,11 @@ func (c *connection) onClose() error {
func (c *connection) readLoop() {
for {
log.Infof("[conn %s] sumbit prep read event", c.id)
eventData := PrepReadEventData{}
eventData.size = defaultReadSize
eventData.data = c.inputBuffer.Book(defaultReadSize)
c.operator.submit(RingPrepRead, eventData)
eventData := RingEventData{}
eventData.Size = defaultReadSize
eventData.Data = c.inputBuffer.Book(defaultReadSize)
eventData.Event = RingPrepRead
c.operator.Submit(eventData)
err := <-c.readLoopTrigger
if err != nil {
log.Infof("[conn %s] read loop quit since connection closed", c.id)
Expand All @@ -100,14 +93,15 @@ func (c *connection) writeLoop() {
size := c.outputBuffer.Len()
if size > 0 && c.lock(flushing) {
log.Infof("[conn %s] sumbit prep write event", c.id)
eventData := PrepWriteEventData{}
eventData.size = size
eventData.data, _ = c.outputBuffer.Seek(size)
c.operator.submit(RingPRepWrite, eventData)
eventData := RingEventData{}
eventData.Size = size
eventData.Data, _ = c.outputBuffer.Seek(size)
eventData.Event = RingPrepWrite
c.operator.Submit(eventData)
}
err := <-c.writeLoopTrigger
if err != nil {
log.Infof("[conn %s] read loop quit since connection closed", c.id)
log.Infof("[conn %s] write loop quit since connection closed", c.id)
return
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/echo_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func handler(ctx context.Context, connection anet.Connection) error {
defer connection.Close()
data, err := reader.ReadAll();
if err != nil {
fmt.Println("error occured while reading connection: {}", err.Error())
fmt.Println("error occurred while reading connection: {}", err.Error())
}
err = writer.WriteBytes(data, len(data));
if err != nil {
fmt.Println("error occured while writing connection: {}", err.Error())
fmt.Println("error occurred while writing connection: {}", err.Error())
}
err = writer.Flush()
if err != nil {
fmt.Println("error occured while flushing connection: {}", err.Error())
fmt.Println("error occurred while flushing connection: {}", err.Error())
}
return nil
}

0 comments on commit 5be301a

Please sign in to comment.