Karthic.K
Karthic.K

Reputation: 436

How to collect k8s pods logs in parallel using golang only for a duration of time

I am new to golang, I have a task to collect the application logs, application is running as deployment in k8s cluster, there are 4 pods in total. As part of test automation, I need to collect the application logs (only for the duration of my test operation) in parallel while I perform some operations on the application and write the logs to a file, and move to the next operation and do the same.

Finally I iterate through the log files one-by-one and filter for certain keywords corresponding to my operation and validate the logs are correct/not.


I am thinking to get the pod logs using kubectl command directly, instead of using the go-sdk as I am facing missing log entries which I couldn't triage with many attempts.

kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt

I found a way to run this command using exec.Command

command := exec.Command("/bin/sh", "-c", "kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt")
    err := command.Start()

Now I need to do this in golang,

func myTest(t *testing.T){
  go collectApplicationLogs("test1")
  
  // do application operation 
   // run test
  
  stopLogsCollection ()   -------> how to achieve this?
}

func collectApplicationLogs(fileName string){
   // command to collect the logs to a file
  // kubectl logs -f -l app=my-app -n dev > /usr/{fileName}
}

Upvotes: 1

Views: 643

Answers (1)

Raihan Khan
Raihan Khan

Reputation: 446

You can use Kubernetes go-client to get logs from the pods. At first create the clientset from kubenetes config. You can use InclusterConfig or Out of cluster config. I have used out of cluster config here.

const (
    // set namespace and label
    namespace = "dev"
    label     = "app=my-app"
)

func main() {
    // parse the .kubeconfig file
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // use the current context in kubeconfig
    ctx := context.TODO()
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        log.Println(err, "Failed to build config from flags")
        return
    }

    err = collectApplicationLogs(ctx, config, "/usr/mylogs.txt")
    if err != nil {
        log.Println(err, "Failed to collect logs")
    }

}

To collect logs, you need to list the gods first. Then get logs from each of those pods and append them to the file concurrently.

func collectApplicationLogs(ctx context.Context, config *rest.Config, filename string) error {
    // create the clientset
    clientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Println("Failed to create clientset from the given config")
        return err
    }
    // get the pods as ListItems
    pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
        LabelSelector: label,
    })
    if err != nil {
        log.Println(err, "Failed to get pods")
        return err
    }
    // If the file doesn't exist, create it or append to the file
    file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
    if err != nil {
        return err
    }
    defer file.Close()
    // get the pod lists first
    // then get the podLogs from each of the pods
    // write to files concurrently
    // use channel for blocking reasons
    ch := make(chan bool)
    podItems := pods.Items
    for i := 0; i < len(podItems); i++ {
        podLogs, err := clientSet.CoreV1().Pods(namespace).GetLogs(podItems[i].Name, &v1.PodLogOptions{
            Follow: true,
        }).Stream(ctx)
        if err != nil {
            return err
        }
        buffer := bufio.NewReader(podLogs)
        go writeLogs(buffer, file, ch)
    }
    <-ch
    return nil
}

func writeLogs(buffer *bufio.Reader, file *os.File, ch chan bool) {
    defer func() {
        ch <- true
    }()
    for {
        str, readErr := buffer.ReadString('\n')
        if readErr == io.EOF {
            break
        }
        _, err := file.Write([]byte(str))
        if err != nil {
            return
        }
    }
}

Upvotes: 4

Related Questions