调试Go例程泄漏

调试Go例程泄漏

在开始调试Goroutines泄漏之前,让我先简要介绍一些基础知识,这些基础知识将使您对问题有更广阔的了解。

并发编程。

  • 并发编程处理程序的并发执行,其中,同时运行多个顺序执行流,从而可以更快地执行计算。

  • 它有助于更好地利用处理器的多核功能,以实现更快的结果,并发/并行程序是必需的。

Goroutines

Goroutine是Go运行时管理的轻量级线程。


1_CeFuCDgaOsAhycBEeq_x5g.jpeg
Goroutine使并发执行成为可能。


简单的编程可同时将数字相加。

package main

import "fmt"

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	x, y := <-c1, <-c2 // receive from c1 aND C2

	fmt.Println(x, y, x+y)
}

并发编程不再是可选的,它是开发在多核处理器上运行的现代软件所必需的。


1_pjS6nGPUYNe7kvFln3J9Zg.jpeg
1_NTmbLFXmz1Q2_Y8HsHDHXw.jpeg


就像在为达成共同目标而进行的任何协调努力的情况下一样,需要同步和沟通。

在上述程序中,每个go例程计算出总和后,它们需要与主goroutine协调以返回结果以计算最终值。


Go的同步方法。


1_o6duCQJ2UNcxzY6EIb32Uw.jpeg
1_GWYUFH14uOVLNHY-L1tv2w.jpeg


Go鼓励使用通道在goroutine之间传递对数据的引用,而不是显式使用锁来调解对共享数据的访问。这种方法可确保在给定时间只有一个goroutine可以访问数据。

现在,让我们在Go中开始并发编程。

如果到目前为止,您将了解不再需要编写并发程序,而Go可以使您轻松实现这一点。另外,您知道Go通道并将其用于Goroutine之间的同步。现在让我们转到同步Goroutine的更困难的部分。


同步可能出错!

听起来很恐怖!!!! 但是可能出什么问题了!!!!!

好吧,go例程之间的协调有很多方法可能出错。

这可能会导致某些goroutine永远等待!


1_TnM7AJcFAkpGpJplbl-iBg.jpeg


请注意,每次使用go关键字时,Go例程将如何退出。

写入没有接收器的频道。

这是一个简单的示例,说明如何在没有接收器的情况下写入通道会导致go例程永远被阻塞。

package main

import (
        "fmt"
        "log"
        "net/http"
        "strconv"
)

// function to add an array of numbers.
func sum(s []int, c chan int) {
        sum := 0
        for _, v := range s {
                sum += v
        }
        // writes the sum to the go routines.
        c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
        s := []int{7, 2, 8, -9, 4, 0}

        c1 := make(chan int)
        c2 := make(chan int)
        // spin up a goroutine.
        go sum(s[:len(s)/2], c1)
        // spin up a goroutine.
        go sum(s[len(s)/2:], c2)
        // not reading from c2.
        // go routine writing to c2 will be blocked.
        x := <-c1
        // write the response.
        fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
        http.HandleFunc("/sum", sumConcurrent)   // set router
        err := http.ListenAndServe(":8001", nil) // set listen port
        if err != nil {
                log.Fatal("ListenAndServe: ", err)
        }
}

注意:实际上,这不是编写程序的方式。它是关于如何引入泄漏的简单说明,我们将进一步使用此代码来识别泄漏并调试应用程序

在没有作者的情况下从频道接收。

范例1:封锁for-select

for {
   select { 
        case <-c: 
         // process here
    }
}

示例2:在通道上循环。

go func() {
for range ch { }
}()

良好做法

使用超时通道

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9) // one second
    timeout <- true
}()

select {
case <- ch:
    // a read from ch has occurred
case <- timeout:
    // the read from ch has timed out
}
           OR 
select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
}

使用上下文包。

Golang上下文包可用于优雅地结束执行例程,甚至超时。


泄漏检测。

用于检测Web服务器中的泄漏的公式是添加检测端点,并将其与负载测试一起使用。

// get the count of number of go routines in the system.
func countGoRoutines() int {
        return runtime.NumGoroutine()
}      

func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
        // Get the count of number of go routines running.
        count := countGoRoutines()
        w.Write([]byte(strconv.Itoa(count)))
}
func main()
   http.HandleFunc("/_count", getGoroutinesCountHandler)
}

在负载测试之前和之后,使用能够响应系统中活跃的goroutine数量的检测端点。

这是您的负载测试程序的流程:

Step 1: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.
Step 2: Perform load test.Lets the load be concurrent. 
     for i := 0; i < 100 ; i++ {
          go callEndpointUnderInvestigation()
     }
Step 3: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.

如果在负载测试后系统中存在的goroutine数量异常增加,则存在泄漏的证据。

这是一个Web服务器的端点泄漏的小示例。通过一个简单的测试,我们可以确定服务器是否存在泄漏。

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strconv"
)

// get the count of number of go routines in the system.
func countGoRoutines() int {
	return runtime.NumGoroutine()
}

func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
	// Get the count of number of go routines running.
	count := countGoRoutines()
	w.Write([]byte(strconv.Itoa(count)))
}

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	// not reading from c2.
	// go routine writing to c2 will be blocked.
  // Since we are not reading from c2, 
  // the goroutine attempting to write to c2 
  // will be blocked forever resulting in leak.
	x := <-c1
	// write the response.
	fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
	// get the sum of numbers.
	http.HandleFunc("/sum", sumConcurrent)
	// get the count of number of go routines in the system.
	http.HandleFunc("/_count", getGoroutinesCountHandler)
	err := http.ListenAndServe(":8001", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"strconv"
	"sync"
)

const (
	leakyServer = "http://localhost:8001"
)

// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
	body, err := getReq("/_count")

	if err != nil {
		return -1, err
	}
	count, err := strconv.Atoi(string(body))
	if err != nil {
		return -1, err
	}
	return count, nil
}

// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
	response, err := http.Get(leakyServer + endPoint)
	if err != nil {
		return []byte{}, err
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)

	if err != nil {
		return []byte{}, err
	}
	return body, nil
}


func main() {
	// get the number of go routines in the leaky server.
	count, err := getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines before the load test in the system.", count)

	var wg sync.WaitGroup
	// send 50 concurrent request to the leaky endpoint.
	for i := 0; i < 50; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			_, err = getReq("/sum")
			if err != nil {
				log.Fatal(err)
			}

		}()
	}
	wg.Wait()
	// get the cout of number of goroutines in the system after the load test.
	count, err = getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines after the load test in the system.", count)
}
// First run the leaky server 
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
 3 Go routines before the load test in the system.
 54 Go routines after the load test in the system.

您可以清楚地看到,对泄漏端点的50个并发请求使系统中的go例程增加了50个。

让我们再次运行负载测试。

$ go run load.go
 53 Go routines before the load test in the system.
 104 Go routines after the load test in the system.

很明显,每次运行负载测试时,服务器中的go例程数量都在增加,并且没有下降。这是泄漏的明显证据。

确定泄漏源。

使用堆栈跟踪检测。

一旦确定了Web服务器中存在泄漏,现在就需要确定泄漏的来源。

添加将返回Web服务器的堆栈跟踪的终结点可以帮助您确定泄漏的来源。

import (
  "runtime/debug"
  "runtime/pprof"
)
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
       stack := debug.Stack()
       w.Write(stack)
       pprof.Lookup("goroutine").WriteTo(w, 2)
}
func main() {
http.HandleFunc("/_stack", getStackTraceHandler)
}

在确定泄漏的存在之后,请在加载之前和之后使用端点获取堆栈跟踪,以识别泄漏的来源。

将堆栈跟踪工具添加到泄漏服务器,然后再次执行负载测试。这是代码:

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"runtime/debug"
	"runtime/pprof"
	"strconv"
)

// get the count of number of go routines in the system.
func countGoRoutines() int {
	return runtime.NumGoroutine()
}

// respond with number of go routines in the system.
func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
	// Get the count of number of go routines running.
	count := countGoRoutines()
	w.Write([]byte(strconv.Itoa(count)))
}

// respond with the stack trace of the system.
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
	stack := debug.Stack()
	w.Write(stack)
	pprof.Lookup("goroutine").WriteTo(w, 2)
}

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	// not reading from c2.
	// go routine writing to c2 will be blocked.
	x := <-c1
	// write the response.
	fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
	// get the sum of numbers.
	http.HandleFunc("/sum", sumConcurrent)
	// get the count of number of go routines in the system.
	http.HandleFunc("/_count", getGoroutinesCountHandler)
	// respond with the stack trace of the system.
	http.HandleFunc("/_stack", getStackTraceHandler)
	err := http.ListenAndServe(":8001", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"strconv"
	"sync"
)

const (
	leakyServer = "http://localhost:8001"
)

// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
	body, err := getReq("/_count")

	if err != nil {
		return -1, err
	}
	count, err := strconv.Atoi(string(body))
	if err != nil {
		return -1, err
	}
	return count, nil
}

// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
	response, err := http.Get(leakyServer + endPoint)
	if err != nil {
		return []byte{}, err
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)

	if err != nil {
		return []byte{}, err
	}
	return body, nil
}

// obtain stack trace of the server.
func getStackTrace() (string, error) {
	body, err := getReq("/_stack")

	if err != nil {
		return "", err
	}
	return string(body), nil
}

func main() {
	// get the number of go routines in the leaky server.
	count, err := getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines before the load test in the system.", count)

	var wg sync.WaitGroup
	// send 50 concurrent request to the leaky endpoint.
	for i := 0; i < 50; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			_, err = getReq("/sum")
			if err != nil {
				log.Fatal(err)
			}

		}()
	}
	wg.Wait()
	// get the cout of number of goroutines in the system after the load test.
	count, err = getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines after the load test in the system.", count)
	// obtain the stack trace of the system.
	trace, err := getStackTrace()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\nStack trace after the load test : \n %s",trace)
}
// First run the leaky server
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
 3 Go routines before the load test in the system.
 54 Go routines after the load test in the system.
 goroutine 149 [chan send]:
main.sum(0xc420122e58, 0x3, 0x3, 0xc420112240)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 243 [chan send]:
main.sum(0xc42021a0d8, 0x3, 0x3, 0xc4202760c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 259 [chan send]:
main.sum(0xc4202700d8, 0x3, 0x3, 0xc42029c0c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 135 [chan send]:
main.sum(0xc420226348, 0x3, 0x3, 0xc4202363c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 166 [chan send]:
main.sum(0xc4202482b8, 0x3, 0x3, 0xc42006b8c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 199 [chan send]:
main.sum(0xc420260378, 0x3, 0x3, 0xc420256480)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b
........

烟囱痕迹清晰地指向泄漏的震中中心。

使用分析。

由于泄漏的goroutine通常会在尝试读取或写入通道时被阻塞,甚至可能正在休眠,因此进行性能分析可以帮助您识别泄漏的根源。

这是我在Gophercon 2016上关于基准和性能分析的演讲



1_Pl1bnsxrFJwXRaWm6lIaIQ.jpeg


重要的是在被测端点处于负载状态时完成仪器测试和性能分析。

避免泄漏,尽早发现

单元测试和功能测试中的仪器可以帮助及早发现泄漏。

计算测试前后的goroutine数量。

func TestMyFunc() {
 // get count of go routines.
 perform the test.
 // get the count diff.
 // alert if there's an unexpected rise.
}

测试中的堆栈差异。

Stack Diff是一个简单的程序,它在测试之前和之后对堆栈跟踪进行比较,并在系统中剩余任何不需要的goroutine时发出警报。将其与您的单元测试和功能测试集成在一起,可以帮助您在开发过程中识别泄漏。

import (
    github.com/fortytw2/leaktest
)
func TestMyFunc(t *testing.T) {
    defer leaktest.Check(t)()

    go func() {
        for {
            time.Sleep(time.Second)
        }
    }()
}

设计安全

具有作为单独容器/进程运行的最终服务/端点的微服务体系结构可以节省受端点/服务之一中的泄漏或资源中断影响的整个系统。如果由Kubernetes,Mesosphere和Docker Swarm之类的工具管理业务流程,那将非常棒。


1_V2eQNITpOaY8h-tdGn8RXw.gif


进行成像以获取整个系统的堆栈跟踪,并尝试确定在数百个服务中导致泄漏的服务!!!它真的很可怕!!!

Goroutine泄漏就像是慢速杀手。它们会在一段时间内缓慢累积,从而浪费您的计算资源,您甚至不会注意到。了解您的泄漏并尽早调试它们非常重要,您应该了解这一点!


上一篇 下一篇