Skip to content

Commit

Permalink
feat: add concurrent uring benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 5, 2024
1 parent ee5942c commit c6be920
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 98 deletions.
2 changes: 2 additions & 0 deletions benchmark/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
go build -v -o ./net/bench_serial ./net/serial
go build -v -o ./net/bench_concurrent ./net/concurrent
go build -v -o ./uring/bench_serial ./uring/serial
go build -v -o ./uring/bench_concurrent ./uring/concurrent

./net/bench_serial
./net/bench_concurrent
./uring/bench_serial
./uring/bench_concurrent
116 changes: 116 additions & 0 deletions benchmark/uring/concurrent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"os"
"net"
"fmt"
"time"
"sync"
"bufio"
"math/rand"

"github.com/sirupsen/logrus"
)

func runServer(port string, stopChan chan interface{}, logger *logrus.Logger) {
listener, err := net.Listen("tcp", port)
if err != nil {
panic("shouldn't failed here")
}

go func() {
for {
conn, err := listener.Accept()
if err != nil {
logger.Warnf("error occurred when accept: %s", err.Error())
continue
}
handleConnection(conn, logger)
}
}()

go func() {
<- stopChan
listener.Close()
}()
}

func handleConnection(conn net.Conn, logger *logrus.Logger) {
connection := connection{}
connection.init(conn, logger)
go connection.run()
}

const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

func randomString(length int) string {
seed := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seed.Intn(len(charset))]
}
return string(b)
}

func main() {
port := ":8000"
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
stopchan := make(chan interface{})
runServer(port, stopchan, logger)
defer close(stopchan)

c := 12
m := 10000
n := 100
messageLength := 48

start := time.Now()

var wg sync.WaitGroup
for i := 0; i < c; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < m; j++ {
conn, err := net.Dial("tcp", port)
if err != nil {
fmt.Printf("failed to connect to server: %v\n", err)
conn.Close()
continue
}

for k := 0; k < n; k++ {
message := randomString(messageLength) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("failed to send message: %v\n", err)
break
}

response, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
fmt.Printf("failed to read response: %v\n", err)
break
}

if message != response {
fmt.Printf("%v %v %v failed\n", i, j, k)
fmt.Printf("expect: %s\n", message)
fmt.Printf("actual: %s\n", response)
break
}
}

conn.Close()
}
}()
}
wg.Wait()

elapsed := time.Since(start)
minutes := int(elapsed.Minutes())
seconds := int(elapsed.Seconds()) % 60
fmt.Printf("the total time for uring to execute %dk connections, with %d writes per connection, is: %d minutes %d seconds\n", m / 1000, n, minutes, seconds)
}
104 changes: 104 additions & 0 deletions benchmark/uring/concurrent/uring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

import (
"os"
"net"

"github.com/zjregee/anet"
"github.com/sirupsen/logrus"
)

const BUFFER_SIZE = 1024

type connection struct {
fd int
file *os.File
conn net.Conn
logger *logrus.Logger
readTrigger chan error
writeTrigger chan error
size int
buffer []byte
operator *anet.FDOperator
}

func (c *connection) init(conn net.Conn, logger *logrus.Logger) error {
file, err := conn.(*net.TCPConn).File()
if err != nil {
return err
}
c.fd = int(file.Fd())
c.file = file
c.conn = conn
c.logger = logger
c.readTrigger = make(chan error)
c.writeTrigger = make(chan error)
c.buffer = make([]byte, BUFFER_SIZE)
c.size = 0
return c.initFDOperator()
}

func (c *connection) initFDOperator() error {
ring := anet.RingManager.Pick()
op := ring.Alloc()
op.FD = c.fd
op.OnRead = c.onRead
op.OnWrite = c.onWrite
op.Ring = ring
op.Register()
c.operator = op
return nil
}

func (c *connection) close() {
c.operator.Free()
c.file.Close()
c.conn.Close()
}

func (c *connection) run() {
defer c.close()

for {
readEventData := anet.PrepReadEventData{}
readEventData.Size = BUFFER_SIZE
readEventData.Data = c.buffer
err := c.operator.Submit(anet.RingPrepRead, readEventData)
if err != nil {
c.logger.Warnf("error occurred when submit read: %s", err.Error())
return
}
err = <-c.readTrigger
if err != nil {
c.logger.Warnf("error occurred when read: %s", err.Error())
return
}

if (c.size == 0) {
return
}

writeEventData := anet.PrepWriteEventData{}
writeEventData.Size = c.size
writeEventData.Data = c.buffer
err = c.operator.Submit(anet.RingPRepWrite, writeEventData)
if err != nil {
c.logger.Warnf("error occurred when submit write: %s", err.Error())
return
}
err = <-c.writeTrigger
if err != nil {
c.logger.Warnf("error occurred when write: %s", err.Error())
return
}
}
}

func (c *connection) onRead(n int, err error) {
c.size = n
c.readTrigger <- err
}

func (c *connection) onWrite(n int, err error) {
c.writeTrigger <- err
}
22 changes: 13 additions & 9 deletions operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ package anet

type FDOperator struct {
FD int
OnRead func(n int) error
OnWrite func(n int) error
ring Ring
OnRead func(n int, err error)
OnWrite func(n int, err error)
Ring Ring
}

func (op *FDOperator) submit(evnet RingEvent, eventData interface{}) error {
return op.ring.Submit(op, evnet, eventData)
func (op *FDOperator) Submit(evnet RingEvent, eventData interface{}) error {
return op.Ring.Submit(op, evnet, eventData)
}

func (op *FDOperator) free() {
op.ring.Free(op)
func (op *FDOperator) Register() {
op.Ring.Register(op)
}

func (op *FDOperator) reset() {
func (op *FDOperator) Free() {
op.Ring.Free(op)
}

func (op *FDOperator) Reset() {
op.FD = 0
op.OnRead = nil
op.OnWrite = nil
op.ring = nil
op.Ring = nil
}
9 changes: 5 additions & 4 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ type Ring interface {
Submit(operator *FDOperator, event RingEvent, eventData interface{}) error
Alloc() *FDOperator
Free(operator *FDOperator)
Register(operator *FDOperator)
Close() error
}

type PrepReadEventData struct {
size int
data []byte
Size int
Data []byte
}

type PrepWriteEventData struct {
size int
data []byte
Size int
Data []byte
}

type RingEvent int
Expand Down
Loading

0 comments on commit c6be920

Please sign in to comment.