Skip to content

Commit

Permalink
Add feature for syncing container logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Nov 23, 2023
1 parent efb7536 commit 5da9bc0
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 38 deletions.
48 changes: 28 additions & 20 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/icinga/icinga-go-library/driver"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-kubernetes/internal"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
"github.com/icinga/icinga-kubernetes/pkg/schema"
"github.com/icinga/icinga-kubernetes/pkg/sync"
"github.com/okzk/sdnotify"
Expand Down Expand Up @@ -70,22 +71,10 @@ func main() {

g, ctx := errgroup.WithContext(ctx)

forwardUpsertNodesChannel := make(chan<- any)
defer close(forwardUpsertNodesChannel)

forwardDeleteNodesChannel := make(chan<- any)
defer close(forwardDeleteNodesChannel)

forwardUpsertNamespacesChannel := make(chan<- any)
defer close(forwardUpsertNamespacesChannel)

forwardDeleteNamespacesChannel := make(chan<- any)
defer close(forwardDeleteNamespacesChannel)

forwardUpsertPodsChannel := make(chan<- any)
forwardUpsertPodsChannel := make(chan database.Entity)
defer close(forwardUpsertPodsChannel)

forwardDeletePodsChannel := make(chan<- any)
forwardDeletePodsChannel := make(chan any)
defer close(forwardDeletePodsChannel)

g.Go(func() error {
Expand All @@ -94,8 +83,6 @@ func main() {
schema.NewNode,
informers.Core().V1().Nodes().Informer(),
logs.GetChildLogger("Nodes"),
sync.WithForwardUpsert(forwardUpsertNodesChannel),
sync.WithForwardDelete(forwardDeleteNodesChannel),
).Run(ctx)
})

Expand All @@ -105,22 +92,43 @@ func main() {
schema.NewNamespace,
informers.Core().V1().Namespaces().Informer(),
logs.GetChildLogger("Namespaces"),
sync.WithForwardUpsert(forwardUpsertNamespacesChannel),
sync.WithForwardDelete(forwardDeleteNamespacesChannel),
).Run(ctx)
})

//upsertPodChannelSpreader := sync.NewChannelSpreader[database.Entity](forwardUpsertPodsChannel)
//deletePodChannelSpreader := sync.NewChannelSpreader[any](forwardDeletePodsChannel)

//upsertPodChannel := upsertPodChannelSpreader.NewChannel()
//deletePodChannel := deletePodChannelSpreader.NewChannel()

forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert)
forwardDeletePodsToLogChannel := make(chan contracts.KDelete)

g.Go(func() error {

defer close(forwardUpsertPodsToLogChannel)
defer close(forwardDeletePodsToLogChannel)

return sync.NewSync(
db,
schema.NewPod,
informers.Core().V1().Pods().Informer(),
logs.GetChildLogger("Pods"),
sync.WithForwardUpsert(forwardUpsertPodsChannel),
sync.WithForwardDelete(forwardDeletePodsChannel),
sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel),
sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel),
).Run(ctx)
})

logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs"))

g.Go(func() error {
return logSync.MaintainList(ctx, forwardUpsertPodsToLogChannel, forwardDeletePodsToLogChannel)
})

g.Go(func() error {
return logSync.Run(ctx)
})

if err := g.Wait(); err != nil {
logging.Fatal(errors.Wrap(err, "can't sync"))
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/schema/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package schema

//
//import (
// "github.com/icinga/icinga-kubernetes/pkg/contracts"
// kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
//)
//
//type Container struct {
// kmetaWithNamespace
//}
//
//func NewContainer() contracts.Resource {
// return &Container{}
//}
//
//func (c *Container) Obtain(kobject kmetav1.Object) {
// c.kmetaWithNamespace.Obtain(kobject)
//}
10 changes: 10 additions & 0 deletions pkg/schema/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package schema

type Log struct {
kmetaWithoutNamespace
Id []byte
ReferenceId []byte
ContainerName string
Time string
Log string
}
235 changes: 235 additions & 0 deletions pkg/sync/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package sync

import (
"bufio"
"context"
"crypto/sha1"
"fmt"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
"github.com/icinga/icinga-kubernetes/pkg/schema"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"io"
kcorev1 "k8s.io/api/core/v1"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"slices"
"strings"
msync "sync"
"time"
)

type LogSync struct {
list []*kcorev1.Pod
lastChecked map[[20]byte]*kmetav1.Time
mutex *msync.RWMutex
clientset *kubernetes.Clientset
db *database.DB
logger *logging.Logger
}

func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) *LogSync {
return &LogSync{
list: []*kcorev1.Pod{},
lastChecked: make(map[[20]byte]*kmetav1.Time),
mutex: &msync.RWMutex{},
clientset: clientset,
db: db,
logger: logger,
}
}

func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]byte) (times []string, messages []string, err error) {

stringReader := strings.NewReader(string(log))
reader := bufio.NewReader(stringReader)

for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
return nil, nil, errors.Wrap(err, "error reading log message")
}

messageTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", strings.Split(line, " ")[0])
if err != nil {
logging.Fatal(errors.Wrap(err, "error parsing log timestamp"))
continue
}

if ls.lastChecked[curContainerId] != nil && messageTime.UnixNano() <= ls.lastChecked[curContainerId].UnixNano() {
continue
}

times = append(times, strings.Split(line, " ")[0])
messages = append(messages, strings.Join(strings.Split(line, " ")[1:], " "))
}

return times, messages, nil
}

func (ls *LogSync) removeFromList(id database.ID) {
out := make([]*kcorev1.Pod, 0)

for _, element := range ls.list {

elementId := sha1.Sum([]byte(element.Namespace + "/" + element.Name))

if fmt.Sprintf("%x", elementId) != id.String() {
out = append(out, element)
}
}

ls.list = out
}

func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error {

ls.logger.Info("Starting maintain list")

g, ctx := errgroup.WithContext(ctx)

deletes := make(chan any)
defer close(deletes)

g.Go(func() error {
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context canceled maintain log sync list")

case podFromChannel, more := <-addChannel:
if !more {
return nil
}

pod := podFromChannel.KObject().(*kcorev1.Pod)

podIsInList := false

for _, listPod := range ls.list {
if listPod.UID == pod.UID {
podIsInList = true
}
}

if podIsInList {
continue
}

ls.mutex.RLock()
ls.list = append(ls.list, pod)
ls.mutex.RUnlock()

case podIdFromChannel, more := <-deleteChannel:
if !more {
return nil
}

idOfPod := podIdFromChannel.ID()

ls.mutex.RLock()
ls.removeFromList(idOfPod)
ls.mutex.RUnlock()

deletes <- idOfPod
}

}
})

g.Go(func() error {
return ls.db.DeleteStreamedByField(ctx, &schema.Log{}, "reference_id", deletes)
})

return g.Wait()
}

func (ls *LogSync) Run(ctx context.Context) error {

ls.logger.Info("Starting sync")

g, ctx := errgroup.WithContext(ctx)

upsertStmt := ls.upsertStmt()

upserts := make(chan database.Entity)
defer close(upserts)

g.Go(func() error {
for {
for _, pod := range ls.list {

curPodId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name))

for _, container := range pod.Spec.Containers {

curContainerId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name + "/" + container.Name))

podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true}

if ls.lastChecked[curContainerId] != nil {
podLogOpts.SinceTime = ls.lastChecked[curContainerId]
}

log, err := ls.clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts).Do(ctx).Raw()
if err != nil {
fmt.Println(errors.Wrap(err, "error reading container log"))
continue
}

times, messages, err := ls.splitTimestampsFromMessages(log, curContainerId)
if err != nil {
return err
}

if len(messages) == 0 {
continue
}

newLog := &schema.Log{
Id: curContainerId[:],
ReferenceId: curPodId[:],
ContainerName: container.Name,
Time: strings.Join(times, "\n"),
Log: strings.Join(messages, "\n"),
}

upserts <- newLog

lastTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", times[len(times)-1])
if err != nil {
return errors.Wrap(err, "error parsing log time")
}

if !slices.Contains(ls.list, pod) {
continue
}

lastV1Time := kmetav1.Time{Time: lastTime}
ls.lastChecked[curContainerId] = &lastV1Time
}
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 5):
}
}
})

g.Go(func() error {
return ls.db.UpsertStreamedWithStatement(ctx, upserts, upsertStmt, 5)
})

return g.Wait()
}

func (ls *LogSync) upsertStmt() string {
return "INSERT INTO log (id, reference_id, container_name, time, log) VALUES (:id, :reference_id, :container_name, :time, :log) ON DUPLICATE KEY UPDATE time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)"
}
Loading

0 comments on commit 5da9bc0

Please sign in to comment.