Skip to content

Commit

Permalink
Comment logs and channel spreader
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Nov 27, 2023
1 parent 7333404 commit b7d3bce
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
32 changes: 21 additions & 11 deletions pkg/sync/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"
)

// LogSync syncs logs to database. Therefore, it maintains a list
// of pod elements to get logs from
type LogSync struct {
list []*kcorev1.Pod
lastChecked map[[20]byte]*kmetav1.Time
Expand All @@ -30,6 +32,7 @@ type LogSync struct {
logger *logging.Logger
}

// NewLogSync creates new LogSync initialized with clientset, database and logger
func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) *LogSync {
return &LogSync{
list: []*kcorev1.Pod{},
Expand All @@ -41,6 +44,19 @@ func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *loggin
}
}

// upsertStmt returns database upsert statement
func (ls *LogSync) upsertStmt() string {
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
"log",
"id, reference_id, container_name, time, log",
":id, :reference_id, :container_name, :time, :log",
"time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)",
)
}

// splitTimestampsFromMessages takes a log as []byte and returns timestamps and messages as separate string slices.
// Additionally, it updates the last checked timestamp for the log
func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]byte) (times []string, messages []string, err error) {

stringReader := strings.NewReader(string(log))
Expand Down Expand Up @@ -72,6 +88,7 @@ func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]by
return times, messages, nil
}

// removeFromList removes pod from maintained list
func (ls *LogSync) removeFromList(id database.ID) {
out := make([]*kcorev1.Pod, 0)

Expand All @@ -87,6 +104,7 @@ func (ls *LogSync) removeFromList(id database.ID) {
ls.list = out
}

// MaintainList adds pods from the addChannel to the list and deletes pods from the deleteChannel from the list
func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error {

ls.logger.Info("Starting maintain list")
Expand Down Expand Up @@ -149,6 +167,8 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts
return g.Wait()
}

// Run starts syncing the logs to the database. Therefore, it loops over all
// containers of each pod in the maintained list every 15 seconds.
func (ls *LogSync) Run(ctx context.Context) error {

ls.logger.Info("Starting sync")
Expand Down Expand Up @@ -218,7 +238,7 @@ func (ls *LogSync) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 15):
}
}
})
Expand All @@ -229,13 +249,3 @@ func (ls *LogSync) Run(ctx context.Context) error {

return g.Wait()
}

func (ls *LogSync) upsertStmt() string {
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s",
"log",
"id, reference_id, container_name, time, log",
":id, :reference_id, :container_name, :time, :log",
"time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)",
)
}
5 changes: 5 additions & 0 deletions pkg/sync/spreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import (
"sync/atomic"
)

// ChannelSpreader takes a channel of type T and fans it out to an array of other channels of type T
type ChannelSpreader[T any] struct {
channelToBreak <-chan T
createdChannels []chan<- T
channels []chan<- T
started atomic.Bool
}

// NewChannelSpreader creates new ChannelSpreader initialized with the channel to break
func NewChannelSpreader[T any](channelToBreak <-chan T) *ChannelSpreader[T] {
return &ChannelSpreader[T]{
channelToBreak: channelToBreak,
}
}

// NewChannel returns and adds new output channel to the list of created channels
func (cs *ChannelSpreader[T]) NewChannel() <-chan T {
if cs.started.Load() == true {
panic("ChannelSpreader already started")
Expand All @@ -29,6 +32,7 @@ func (cs *ChannelSpreader[T]) NewChannel() <-chan T {
return channel
}

// AddChannel adds given output channel to the list of added channels
func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) {
if cs.started.Load() == true {
panic("ChannelSpreader already started")
Expand All @@ -37,6 +41,7 @@ func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) {
cs.channels = append(cs.channels, channel)
}

// Run combines the lists and starts fanning out the channel to the channels from the list
func (cs *ChannelSpreader[T]) Run(ctx context.Context) error {

cs.started.Store(true)
Expand Down

0 comments on commit b7d3bce

Please sign in to comment.