简介

watchdog
watchdog

watchdog 为 function 提供了一个通用的接口,用于被外层应用调用。watchdog 本质是一个 WebServer,把接收到的 Request 通过 stdin 传输给 function 的作为其参数,function 把返回值通过 stdout 输出,watchdog 捕捉 stdout,并把捕捉到的结果通过 Response 返回。而这个调用过程对于 function 是无感的,function 只要专心处理内部逻辑即可。

为了使 watchdog 支持运行多种不同语言编写的 function,所以需要通过模板把 function 进行封装,不同语言模板不同,但是效果是一致的。封装后的程序可以想象成一个二进制程序,接收 watchdog stdin 写入的参数 ,并把结果通过 stdout 输出。

watchdog 通过 /tmp/.lock 文件作为健康检查的标志文件,可以通过删除 /tmp/.lock 可以触发 Swarm 重新部署新的程序

模板

以 Go 语言 tag 1.2 的模板为例

$ tree
.
├── Dockerfile
├── function
│   └── handler.go
├── main.go
└── template.yml

1 directory, 4 files
  • main.go 接收 stdin,调用 function,把结果输出。最终会被编译成二进制程序被 watchdog 调用
func main() {
	input, err := ioutil.ReadAll(os.Stdin)
	fmt.Println(function.Handle(input))
}
  • function/handler.go function 具体代码,function 被当做第三包引入到 main.go 中,所以其 Handle 函数名、参数和返回值是固定的(或者说跟 main.go 是对应的)
  • Dockerfile 导入 watchdog,编译 main.go,设置 fprocess 环境变量,暴露端口 ,创建镜像等操作

其他语言与这个过程类似。fprocess 环境变量被用于 watchdog 调用,所以需要设置为运行编译二进制的方法,例如,Go 的为 fprocess="./handler" 而 Python 的为 fprocess="python index.py"

watchdog 源码

watchdog 源码: tag 0.16.0

简单流程图

读取配置信息

从环境变量中读取配置信息 func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig

配置信息的具体配置项 README.md

创建服务和优雅关闭

创建 HTTP 服务器,设置读写超时时间和最大 Header 容量为 1MB,并且监听 /_/health/ 路由,只要路由不是 /_/health 都会被认为是调用 function。

s := &http.Server{
    Addr:           fmt.Sprintf(":%d", config.port),
    ReadTimeout:    readTimeout,
    WriteTimeout:   writeTimeout,
    MaxHeaderBytes: 1 << 20, // Max header of 1MB
}

httpMetrics := metrics.NewHttp()

// 路由,注意代码顺序
http.HandleFunc("/_/health", makeHealthHandler())
http.HandleFunc("/", metrics.InstrumentHandler(makeRequestHandler(&config), httpMetrics))

// "/" 路由还有限流功能
// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/watchdog/handler.go#L301
limiter.NewConcurrencyLimiter(handler, config.maxInflight).ServeHTTP

并且 / 路由会被增加 prometheus 监控,统计其请求数和每个请求的运行时间

func NewHttp() Http {
	return Http{
		RequestsTotal: promauto.NewCounterVec(prometheus.CounterOpts{
			Subsystem: "http",
			Name:      "requests_total",
			Help:      "total HTTP requests processed",
		}, []string{"code", "method"}),
		RequestDurationHistogram: promauto.NewHistogramVec(prometheus.HistogramOpts{
			Subsystem: "http",
			Name:      "request_duration_seconds",
			Help:      "Seconds spent serving HTTP requests.",
			Buckets:   prometheus.DefBuckets,
		}, []string{"code", "method"}),
	}
}

启动 HTTP 服务和 prometheus 的监控服务,在启动 HTTP 服务时会根据 suppressLock 配置项决定是否创建 /tmp/.lock 文件,除了 /tmp/.lock 文件还有 acceptingConnections 全局变量,当没有 /tmp/.lock 或者 acceptingConnections 为 0 都说明服务不健康,需要被 Swarm 处理。

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/watchdog/main.go#L135-L147
if suppressLock == false {
    path, writeErr := createLockFile()
} else {
    atomic.StoreInt32(&acceptingConnections, 1)
}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/watchdog/handler.go#L272-L282
func createLockFile() (string, error) {
	path := filepath.Join(os.TempDir(), ".lock")
	writeErr := ioutil.WriteFile(path, []byte{}, 0660)
	atomic.StoreInt32(&acceptingConnections, 1)
	return path, writeErr
}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/watchdog/handler.go#L282-L301
if atomic.LoadInt32(&acceptingConnections) == 0 || lockFilePresent() == false {
    w.WriteHeader(http.StatusServiceUnavailable)
    return
}

同时会监听 Unix 信号,比较简单的优雅关闭

function调用

除了健康检查以外所有的请求都会被 pipeRequest 方法处理,pipeRequest 执行 function 就是 fork function 进程,流程如下:

  1. 通过 fprocess 环境变量获得具体调用的命令
  2. 把 HTTP Request 一些头部信息进行转换增加到调用命令的环境变量里
  3. 启动两个协程,一个协程往 function 里写入,另外一个协程读取 function 输出
  4. 如果设置了执行超时则启动定时器,在定时器执行时如果 function 还没有运行完则 kill
  5. 统计 function 运行时间,Content-Type 等后续操作

根据配置文件,还可以把 Request Body 和 Header 进行 json 序列化,把序列化后的字段传输给 function,源码

处理 HTTP Request 时会把 Header、Query、Path、Host 等增加到环境变量里

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/watchdog/handler.go#L223

// Content-Type=text/html  =>  Http_Content_Type=text/html
for k, v := range r.Header {
    kv := fmt.Sprintf("Http_%s=%s", strings.Replace(k, "-", "_", -1), v[0])
    envs = append(envs, kv)
}
envs = append(envs, fmt.Sprintf("Http_Method=%s", method))
envs = append(envs, fmt.Sprintf("Http_Content_Length=%d", r.ContentLength))
envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery))
envs = append(envs, fmt.Sprintf("Http_Path=%s", r.URL.Path))
envs = append(envs, fmt.Sprintf("Http_Host=%s", r.Host))

pipeRequest 的简要代码,具体执行见注释

func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string) {
	startTime := time.Now()

	// 根据环境变量 fprocess 获得具体的 exec 命令
	parts := strings.Split(config.faasProcess, " ")
	targetCmd := exec.Command(parts[0], parts[1:]...)

	// 把 Request 中的 Header 添加到 exec 的环境变量
	envs := getAdditionalEnvs(config, r, method)
	if len(envs) > 0 {
		targetCmd.Env = envs
	}

	// 创建 exec 输入端
	writer, _ := targetCmd.StdinPipe()

	// 从 Request Body 中读取请求
	requestBody, buildInputErr = buildFunctionInput(config, r)
	if buildInputErr != nil {
		// 读取错误,返回错误信息
		return
	}

	wg.Add(2)

	var timer *time.Timer

	if config.execTimeout > 0*time.Second {
		timer = time.AfterFunc(config.execTimeout, func() {
			if targetCmd != nil && targetCmd.Process != nil {
				// 如果超时则 kill 进程
				targetCmd.Process.Kill()
			}
		})
	}

	// 启动一个协程,向 exec 的 pipe 中写入参数
	go func() {
		defer wg.Done()
		writer.Write(requestBody)
		writer.Close()
	}()

	// 启动一个协程读取 stdOut 和 stdErr
	go func() {
		var b bytes.Buffer
		targetCmd.Stderr = &b
		defer wg.Done()

		out, err = targetCmd.Output()
		b.Reset()
	}()

	// 等待两个协程停止
	wg.Wait()

	execDuration := time.Since(startTime).Seconds()

	// 把输出写入到 Response
	w.Header().Set("Content-Type", config.contentType)
	w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execDuration))
	w.WriteHeader(200)
	w.Write(out)
}

总结

整体比较简单,代码量不是很多,比较容易