简介

gateway
gateway

Gateway 是 OpenFaas 最关键的组件

  • 提供了 WEB 管理界面
  • 为 function 提供了 REST API
  • 具有 Function Store 功能
  • 通过 Prometheus 提供了进行自动扩容功能

开发者通过 URL 执行 function 时,请求是被 Gateway 接收,Gateway 转发请求到后端具体的 function 容器中,同时把 function 响应返回给开发者。同时 Gateway 为方便 faas-cli 和其他程序调用还提供了 REST API,API 可以创建 function、主动扩容 function、删除 function、异步调用 function 等常用功能。Gateway 通过 Provider 管理部署 function,通过 Prometheus 的 AlertManager 进行伸缩管理。

Provider 完全不被外界所知,Gateway 包装 Provider

源码

Gateway 源码以 tag 0.16.0 为例

示例代码都为缩写,但和项目逻辑一致

路由概览

Gateway 终究是一个 Web Server,所以路由才是精髓,根据功能可以把路由进行分类

// 调用已经部署的 function
// r.StrictSlash(false)	// This didn't work, so register routes twice
r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}", functionProxy)
r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/", functionProxy)
r.HandleFunc("/function/{name:[-a-zA-Z_0-9]+}/{params:.*}", functionProxy)

// 查看系统信息
r.HandleFunc("/system/info", faasHandlers.InfoHandler).Methods(http.MethodGet)
// WebHook 功能,提前设置 Prometheus 的 AlertManager 报警规则
// 当 function 资源报警则会通过 WebHook 触发此路由,从而实现自动伸缩
r.HandleFunc("/system/alert", faasHandlers.Alert).Methods(http.MethodPost)

// 部署、删除、更新等对 function 的操作
r.HandleFunc("/system/function/{name:[-a-zA-Z_0-9]+}", faasHandlers.QueryFunction).Methods(http.MethodGet)
r.HandleFunc("/system/functions", faasHandlers.ListFunctions).Methods(http.MethodGet)
r.HandleFunc("/system/functions", faasHandlers.DeployFunction).Methods(http.MethodPost)
r.HandleFunc("/system/functions", faasHandlers.DeleteFunction).Methods(http.MethodDelete)
r.HandleFunc("/system/functions", faasHandlers.UpdateFunction).Methods(http.MethodPut)
r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.ScaleFunction).Methods(http.MethodPost)

// 其他路由
r.HandleFunc("/system/secrets", faasHandlers.SecretHandler).Methods(http.MethodGet, http.MethodPut, http.MethodPost, http.MethodDelete)
r.HandleFunc("/system/logs", faasHandlers.LogProxyHandler).Methods(http.MethodGet)

// function 异步调用,只有部署了 NATS 才可以使用异步 function 功能
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.QueuedProxy).Methods(http.MethodPost)
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.QueuedProxy).Methods(http.MethodPost)
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/{params:.*}", faasHandlers.QueuedProxy).Methods(http.MethodPost)
r.HandleFunc("/system/async-report", handlers.MakeNotifierWrapper(faasHandlers.AsyncReport, forwardingNotifiers))

// UI 路由等
r.Handle("/", http.RedirectHandler("/ui/", http.StatusMovedPermanently)).Methods(http.MethodGet)
r.PathPrefix("/ui/").Handler(uiHandler).Methods(http.MethodGet)
r.HandleFunc("/healthz", handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)).Methods(http.MethodGet)

通过源码可知,HandlerSet 结构体是所有 http.HandlerFunc 的集合,初始化也就是逐一确定 HandlerSet 中的值

var faasHandlers types.HandlerSet
functionProxy := faasHandlers.Proxy

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/types/handler_set.go#L4-L37
type HandlerSet struct {
	// Proxy invokes functions upstream
	Proxy http.HandlerFunc

	DeployFunction http.HandlerFunc
	DeleteFunction http.HandlerFunc
	ListFunctions  http.HandlerFunc
	Alert          http.HandlerFunc

	UpdateFunction http.HandlerFunc

	// QueryFunction - queries the metdata for a function
	QueryFunction http.HandlerFunc

	// QueuedProxy - queue work and return synchronous response
	QueuedProxy http.HandlerFunc

	// AsyncReport - report a deferred execution result
	AsyncReport http.HandlerFunc

	// ScaleFunction allows a function to be scaled
	ScaleFunction http.HandlerFunc

	// InfoHandler provides version and build info
	InfoHandler http.HandlerFunc

	// SecretHandler allows secrets to be managed
	SecretHandler http.HandlerFunc

	// LogProxyHandler allows streaming of logs for functions
	LogProxyHandler http.HandlerFunc
}

流程

  1. 读取配置信息
  2. 根据 UseBasicAuth 配置项确定 Provider 的 Basic Auth 信息
    • 如果需要 Basic Auth 则实例化 *auth.BasicAuthCredentials
  3. 为 Provider 创建一个 Prometheus Exporter,每 5 秒通过 Provider 接口抓取所有的 function 状态信息,暴露给 Prometheus
  4. 初始化一个通用的反向代理 Client
  5. 根据 DirectFunctions 配置项确定实现 handlers.BaseURLResolverhandlers.URLPathTransformer 接口的具体方法,DirectFunctions 配置项决定了 Gateway 是把请求通过 Provider 转发到 function 还是直接转发到 function
  6. 初始化多个 http.HandlerFunc,包括 Alert
    • 通过 plugin.NewExternalServiceQuery 实例化 alertHandler,从而实现 SetReplicasGetReplicas 方法
  7. 通过 NATSAddress 和 NATSPort 配置项决定 Gateway 是否支持异步 function 功能
  8. 启动 HTTP 服务、启动 Prometheus 服务

配置项

有几个比较重要的配置项

  • FunctionsProviderURL: 上游 Provider 的地址
  • NATSAddress,NATSPort: NATS 连接地址
  • DirectFunctions: 是否直接调用 function 而不经过 Provider
  • ScaleFromZero: 启用时 Gateway 把所有 function 启动到要求的最低副本数(暂时不是很理解)
  • UseBasicAuth: 上游 Provider 是否需要 basic auth
  • AuthProxyURL: 是一个 Basic Auth 认证连接,为空则不需要认证,例如: http://basic-auth-plugin:8080/validate

Basic Auth

如果 UseBasicAuth 配置项为 true,则会根据 SecretMountPath 配置项读取 Basic Auth 信息。

credentials 具体为 Provider 的 Basic Auth 信息,主要用在 Gateway 访问 Provider 时,需要在转发 function 请求时注入到 HTTP Request 中。

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/server.go#L36-L51
// credentials is used for service-to-service auth
var credentials *auth.BasicAuthCredentials

if config.UseBasicAuth {
	reader := auth.ReadBasicAuthFromDisk{
		SecretMountPath: config.SecretMountPath,
	}
	credentials, _ = reader.Read()
}

// 用于 function 转发时注入 Basic Auth
serviceAuthInjector = &handlers.BasicAuthInjector{Credentials: credentials}
func (b BasicAuthInjector) Inject(r *http.Request) {
	if r != nil && b.Credentials != nil {
		r.SetBasicAuth(b.Credentials.User, b.Credentials.Password)
	}
}

如果 credentials 初始化成功(也就是 UseBasicAuth 配置项为 true)则请求 Gateway 的 UI、API 也需要 Basic Auth,而具体的验证方法是通过 handlers.MakeExternalAuthHandler 方法访问 AuthProxyURL 配置项的链接进行鉴权

decorateExternalAuth := handlers.MakeExternalAuthHandler

if credentials != nil {
	r.PathPrefix("/ui/").Handler(decorateExternalAuth(uiHandler.ServeHTTP, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)).Methods(http.MethodGet)
	faasHandlers.Alert =
		decorateExternalAuth(faasHandlers.Alert, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
	faasHandlers.UpdateFunction =
		decorateExternalAuth(faasHandlers.UpdateFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
}	

MakeExternalAuthHandler 的作用类似一个修饰器,或者说是 HTTP 中间件。主要作用就是访问 AuthProxyURL,如果可以访问则执行具体的 http.HandlerFunc,否则返回错误响应,对于鉴权来说错误响应也就是 401

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/external_auth.go#L10
func MakeExternalAuthHandler(next http.HandlerFunc, upstreamTimeout time.Duration, upstreamURL string, passBody bool) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		// 向 upstreamURL 发送 GET 请求
		req, _ := http.NewRequest(http.MethodGet, upstreamURL, nil)

		// 通过 Context 设置超时,如果返回 200 说明鉴权成功,否则把报错返回
		res, err := http.DefaultClient.Do(req.WithContext(deadlineContext))
		if res.StatusCode == http.StatusOK {
			next.ServeHTTP(w, r)
			return
		}

		if res.Body != nil {
			io.Copy(w, res.Body)
		}
	}
}

初始化 Exporter

Gateway 实现 Provider Exporter 功能(Exporter 属于 Prometheus 的概念),Gateway 抓取所有 function 的状态信息,从用于 Prometheus 监控数据。

具体 Prometheus 指标是通过 BuildMetricsOptions 声明的。

然启动一个协程进行轮训

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/server.go#L55
// 生成具体的 Prometheus 指标
metricsOptions := metrics.BuildMetricsOptions()
exporter := metrics.NewExporter(metricsOptions, credentials)

// 启动一个协程进行轮训,定时获取 Provider 状态信息
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)

// exporter 实现了 Prometheus.Collector 接口,然后注册到 Prometheus,会被 Prometheus 抓取
metrics.RegisterExporter(exporter)

// 轮训具体实现
// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/metrics/exporter.go#L66
func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
	ticker := time.NewTicker(interval)
	quit := make(chan struct{})

	// 使用统一的 client
	proxyClient := http.Client{}

	// 启动协程,通过 Provider 接口返回所有 function 的状态信息
	go func() {
		for {
			select {
			case <-ticker.C:
				// 请求接口,解析 json 数据
				get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
				services := []types.FunctionStatus{}
				e.services = services
				break
			case <-quit:
				return
			}
		}
	}()
}

统一的反向代理 client

Gateway 转发 function 到 Provider,所以提供了统一的反向代理 Client,(todo: client 的优化)

reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL,
		config.UpstreamTimeout,
		config.MaxIdleConns,
		config.MaxIdleConnsPerHost)

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/types/proxy_client.go#L12
func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration, maxIdleConns, maxIdleConnsPerHost int) *HTTPClientReverseProxy {
}

转发请求

对于 function 类的操作,创建统一的转发方法,转发请求。

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/server.go#L99
faasHandlers.Proxy = handlers.MakeForwardingProxyHandler(reverseProxy, functionNotifiers, functionURLResolver, functionURLTransformer, nil)
faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
// 多个请求 ...

为了支持多种不同的情况,所以参数都是以 interface 的形式传入的

  • proxy 统一的反向代理 Client,proxy 即为上一步生成的统一 Client
  • notifiers 通知列表,完成后逐一进行通知,
  • baseURLResolver 实现对上游请求地址的解析
  • urlPathTransformer 把请求转换成符合上游地址,统一请求地址
  • serviceAuthInjector BasicAuth 注入
// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/forwarding_proxy.go#L40

func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
	notifiers []HTTPNotifier,
	baseURLResolver BaseURLResolver,
	urlPathTransformer URLPathTransformer,
	serviceAuthInjector AuthInjector) http.HandlerFunc {

	writeRequestURI := false
	if _, exists := os.LookupEnv("write_request_uri"); exists {
		writeRequestURI = exists
	}

	return func(w http.ResponseWriter, r *http.Request) {
		baseURL := baseURLResolver.Resolve(r)
		originalURL := r.URL.String()

		// 解析具体的请求地址,使通过 Provider 转发或直接发送到 function 的请求统一
		requestURL := urlPathTransformer.Transform(r)

		// 发起转发请求
		statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector)

		// 向通知列表发起通知
		for _, notifier := range notifiers {
			notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
		}

	}
}

forwardRequest 即是转发请求函数

Gateway 通过配置项 DirectFunctions 支持两种转发请求的方式,一种为转发到 Provider,通过 Provider 再转发到 function,另一种为直接转发到 function。MakeForwardingProxyHandler 通过 interface 形式的参数支持多种情况,只需要根据配置项实现接口即可。

var functionURLResolver handlers.BaseURLResolver
var functionURLTransformer handlers.URLPathTransformer

这两种方式的区别是 BaseURLResolver 和 URLPathTransformer 接口实现的方式不同,用于生成统一的 Request,使后端 function 对此转发策略无感,function 收到的请求是统一形式的

转发到 Provider

转发到 Provider 是默认转发方式,所以整体处理起来,直接返回即可

urlResolver := handlers.SingleHostBaseURLResolver{BaseURL: config.FunctionsProviderURL.String()}
nilURLTransformer := handlers.TransparentURLPathTransformer{}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/forwarding_proxy.go#L165
type SingleHostBaseURLResolver struct {
	BaseURL string
}

func (s SingleHostBaseURLResolver) Resolve(r *http.Request) string {

	baseURL := s.BaseURL

	if strings.HasSuffix(baseURL, "/") {
		baseURL = baseURL[0 : len(baseURL)-1]
	}
	return baseURL
}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/forwarding_proxy.go#L199
type TransparentURLPathTransformer struct {
}

func (f TransparentURLPathTransformer) Transform(r *http.Request) string {
	return r.URL.Path
}

转发到 function

Resolve 的主要作用为通过 Request Path 中的 function 名字,拼接出调用 function 的 URL

Transform 主要作用是把 Path 中 /function/servicename/ 移除,使后端 function 收到的请求 Request URL 与通过 Provider 转发过来的一致

functionURLResolver = handlers.FunctionAsHostBaseURLResolver{FunctionSuffix: config.DirectFunctionsSuffix}
functionURLTransformer = handlers.FunctionPrefixTrimmingURLPathTransformer{}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/forwarding_proxy.go#L181
type FunctionAsHostBaseURLResolver struct {
	FunctionSuffix string
}

// 获得 function 请求地址
func (f FunctionAsHostBaseURLResolver) Resolve(r *http.Request) string {
	svcName := getServiceName(r.URL.Path)

	const watchdogPort = 8080
	var suffix string
	if len(f.FunctionSuffix) > 0 {
		suffix = "." + f.FunctionSuffix
	}

	return fmt.Sprintf("http://%s%s:%d", svcName, suffix, watchdogPort)
}

var functionMatcher = regexp.MustCompile("^/?(?:async-)?function/([^/?]+)([^?]*)")

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/forwarding_proxy.go#L208
type FunctionPrefixTrimmingURLPathTransformer struct {
}

// 从 path 中移除 `/function/servicename/`
func (f FunctionPrefixTrimmingURLPathTransformer) Transform(r *http.Request) string {
	ret := r.URL.Path

	if ret != "" {
		matcher := functionMatcher.Copy()
		parts := matcher.FindStringSubmatch(ret)
		if len(parts) == hasPathCount {
			ret = parts[pathIndex]
		}
	}

	return ret
}

自动伸缩

OpenFaas 最出彩的功能是自动伸缩,OpenFaas 的伸缩通过三种方式,一种是通过 API 接口手动伸缩,另一种是通过 Prometheus 的 AlertManager 报警规则触发 WebHook 实现(也就是自动伸缩),还可以通过 Kubernetes HPAv2 实现,并 HPAv2 支持 CPU、memory 或者其他自定义指标进行

手动伸缩

通过 API 接口调用即可实现 function 的伸缩,具体实现是把请求转发到 Provider,由 Provider 实现自动伸缩

faasHandlers.ScaleFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)

r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.ScaleFunction).Methods(http.MethodPost)

自动伸缩

根据 AlertManager 的报警规则 alertmanager.yml,如果请求数到达一定标准则触发 hook,进行自动伸缩。

alertHandler := plugin.NewExternalServiceQuery(*config.FunctionsProviderURL, serviceAuthInjector)
faasHandlers.Alert = handlers.MakeNotifierWrapper(
	handlers.MakeAlertHandler(alertHandler),
	forwardingNotifiers,
)

r.HandleFunc("/system/alert", faasHandlers.Alert).Methods(http.MethodPost)

NewExternalServiceQuery 返回 scaling.ServiceQuery 接口,主要用于查询当前副本数量和设置副本数量,用于对后端 Pod 进行伸缩

由于 OpenFaas 支持多种后端(例如:k8s、swarm等),所以具体伸缩实现是不同的,需要通过不同的 Provider 实现进行处理,具体查询和设置的方式是通过发送请求到 Provider 处理

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/plugin/external.go#L60

func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) {
	
	// 向 Provider 发送查询请求,返回具体 function 的状态
	urlPath := fmt.Sprintf("%ssystem/function/%s", s.URL.String(), serviceName)
	req, _ := http.NewRequest(http.MethodGet, urlPath, nil)
	res, err := s.ProxyClient.Do(req)

	// 解析响应,返回统一的 struct
	if res.StatusCode == http.StatusOK {
		bytesOut, _ := ioutil.ReadAll(res.Body)
		err = json.Unmarshal(bytesOut, &function)
	}

	return scaling.ServiceQueryResponse{
		Replicas:          function.Replicas,
		MaxReplicas:       maxReplicas,
		MinReplicas:       minReplicas,
		ScalingFactor:     scalingFactor,
		AvailableReplicas: availableReplicas,
	}, err
}

func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) error {
	var err error

	scaleReq := ScaleServiceRequest{
		ServiceName: serviceName,
		Replicas:    count,
	}

	// 向 Provider 发送增加副本的请求,通过 Provider 进行处理
	urlPath := fmt.Sprintf("%ssystem/scale-function/%s", s.URL.String(), serviceName)
	req, _ := http.NewRequest(http.MethodPost, urlPath, bytes.NewReader(requestBody))

	if !(res.StatusCode == http.StatusOK || res.StatusCode == http.StatusAccepted) {
		err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath)
	}

	return err
}

MakeNotifierWrapper 是一个简单的包装函数,在运行完 http.HandlerFunc 后可以逐一向 notifiers 发送通知。newWriteInterceptor 通过对 http.ResponseWriter 返回一个可以获得状态码的新 http.ResponseWriter

func MakeNotifierWrapper(next http.HandlerFunc, notifiers []HTTPNotifier) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		then := time.Now()

		writer := newWriteInterceptor(w)
		next(&writer, r)

		url := r.URL.String()
		for _, notifier := range notifiers {
			notifier.Notify(r.Method, url, url, writer.Status(), time.Since(then))
		}
	}
}

MakeAlertHandler 是具体的伸缩逻辑

  • 解析报警信息
  • 对报警的 function 逐一处理
  • 获取当前 function 副本数量
  • 计算需要的副本数量
  • 把当前副本数量变为所需的值
// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/alerthandler.go#L17
// 接收 Prometheus 的报警信息,对 function 进行伸缩处理
func MakeAlertHandler(service scaling.ServiceQuery) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {

		// 读取报警信息详情
		body, readErr := ioutil.ReadAll(r.Body)

		// 解析报警信息
		var req requests.PrometheusAlert
		err := json.Unmarshal(body, &req)
		
		// 根据规则对有问题的 function 逐一进行处理
		errors := handleAlerts(&req, service)
		w.WriteHeader(http.StatusOK)
	}
}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/alerthandler.go#L60
// 逐一对 function 进行处理
func handleAlerts(req *requests.PrometheusAlert, service scaling.ServiceQuery) []error {
	for _, alert := range req.Alerts {
		err := scaleService(alert, service);
	}
}

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/alerthandler.go#L72
// 具体处理 function 的逻辑
func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error {
	var err error
	serviceName := alert.Labels.FunctionName

	if len(serviceName) > 0 {

		// 查询 function 的副本数量
		queryResponse, getErr := service.GetReplicas(serviceName)
		if getErr == nil {
			status := alert.Status
			
			// 计算需要伸缩的具体数量
			newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor)

			// 伸缩 function 数量至指定数量
			updateErr := service.SetReplicas(serviceName, newReplicas)
			if updateErr != nil {
				err = updateErr
			}
		}
	}
	return err
}

通过 CalculateReplicas 计算需要副本的数量,默认值见代码

  • maxReplicas(最大副本数): 默认值为 100,可以通过 com.openfaas.scale.max 进行设置
  • minReplicas(最小副本数): 最小值为 1,可以通过 com.openfaas.scale.min 进行设置
  • scalingFactor(伸缩比例): 伸缩比例,和设置最大副本一起计算出需要增大的副本数量,可以通过 com.openfaas.scale.factor 进行设置

具体计算公式是 math.Ceil(maxReplicas / 100 * scalingFactor)修改默认见代码

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/alerthandler.go#L97
// 根据 current/desired 来计算具体需要伸缩到的数量
func CalculateReplicas(status string, currentReplicas uint64, maxReplicas uint64, minReplicas uint64, scalingFactor uint64) uint64 {
	newReplicas := currentReplicas
	step := uint64(math.Ceil(float64(maxReplicas) / 100 * float64(scalingFactor)))

	if status == "firing" && step > 0 {
		if currentReplicas+step > maxReplicas {
			newReplicas = maxReplicas
		} else {
			newReplicas = currentReplicas + step
		}
	} else { // Resolved event.
		newReplicas = minReplicas
	}

	return newReplicas
}

异步调用 function

异步调用 function 是通过 NATS 队列实现。把调用 function 的信息推送到 queue-worker 中,由 worker 执行 function,并把运行结果发送给 /system/async-report 路由。具体执行是通过 nats-queue-worker 仓库代码实现的

NATS 属于小众的中间件产品,与 Kafaka、RabbitMQ、ActiveMQ 和 NSQ 类似都属于消息中间件,通俗讲就是消息队列。 NATS 官网

异步调用函数即是把 function 推送到消息队列中,由消息系列进行异步处理

// 创建 NATS 配置文件
defaultNATSConfig := natsHandler.NewDefaultNATSConfig(maxReconnect, interval)

// 连接 NATS
natsQueue, queueErr := natsHandler.CreateNATSQueue(*config.NATSAddress, *config.NATSPort, defaultNATSConfig)

faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper(
	handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, functionURLTransformer)),
	forwardingNotifiers,
)

r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/", faasHandlers.QueuedProxy).Methods(http.MethodPost)
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.QueuedProxy).Methods(http.MethodPost)
r.HandleFunc("/async-function/{name:[-a-zA-Z_0-9]+}/{params:.*}", faasHandlers.QueuedProxy).Methods(http.MethodPost)

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/queue_proxy.go#L16
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {

		// 读取请求 body
		body, err := ioutil.ReadAll(r.Body)

		// 获得 function 名称
		vars := mux.Vars(r)
		name := vars["name"]

		// 获得回调地址
		callbackURLHeader := r.Header.Get("X-Callback-Url")
		var callbackURL *url.URL

		// 设置 function 执行完的回调地址
		if len(callbackURLHeader) > 0 {
			urlVal, urlErr := url.Parse(callbackURLHeader)
			callbackURL = urlVal
		}

		req := &queue.Request{
			Function:    name,
			Body:        body,
			Method:      r.Method,
			QueryString: r.URL.RawQuery,
			Path:        pathTransformer.Transform(r),
			Header:      r.Header,
			Host:        r.Host,
			CallbackURL: callbackURL,
		}

		// 把请求推送到队列中
		canQueueRequests.Queue(req)
	}

接收异步 function 报告

当异步 function 调用完成,则会向 /system/async-report 路由发送运行结果,用于 Prometheus 监控

  • 读取运行结果,包含:函数名、运行结果、运行时间
  • 添加到 Prometheus 数据中
r.HandleFunc("/system/async-report", handlers.MakeNotifierWrapper(faasHandlers.AsyncReport, forwardingNotifiers))

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/asyncreport.go#L15
func MakeAsyncReport(metrics metrics.MetricOptions) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {

		// 读取 function 的运行结果,包含:函数名、运行结果、运行时间
		report := requests.AsyncReport{}
		bytesOut, _ := ioutil.ReadAll(r.Body)
		json.Unmarshal(bytesOut, &report)

		// 添加到 Prometheus 监控数据中
		trackInvocation(report.FunctionName, metrics, report.StatusCode)
		trackTimeExact(taken, metrics, report.FunctionName)
	}
}

从0增加副本到的最小值

如果配置项 ScaleFromZero 为 true,则在调用 function 时会判断 function 副本数,然后 function 自动伸缩到最小副本数

逻辑比较简单暴力

  • 检查副本数,如果为 0 增加副本数
  • 循环增加副本数,直至达到目的
scalingConfig := scaling.ScalingConfig{
	MaxPollCount:         uint(1000),
	SetScaleRetries:      uint(20),
	FunctionPollInterval: time.Millisecond * 50,
	CacheExpiry:          time.Second * 5, // freshness of replica values before going stale
	ServiceQuery:         alertHandler,
}

functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig)

// https://github.com/openfaas/faas/blob/df97efafae36ce7093ad353e3e6acc0e93d6300e/gateway/handlers/scaling.go#L13
// MakeScalingHandler 把 function 副本数从 0 增加到最小副本数
// 等伸缩完成即可正常调用 function
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc {

	scaler := scaling.NewFunctionScaler(config)

	return func(w http.ResponseWriter, r *http.Request) {

		// 根据 Path 获得 function 名称,查询 function 状态,如果副本数为 0 则进行增大副本数至最小值
		functionName := getServiceName(r.URL.String())
		res := scaler.Scale(functionName)

		// 执行具体的 function 功能
		if res.Available {
			next.ServeHTTP(w, r)
			return
		}
	}
}

func (f *FunctionScaler) Scale(functionName string) FunctionScaleResult {
	start := time.Now()

	// 如果可以从缓存中获得 function 的信息则直接返回
	if cachedResponse, hit := f.Cache.Get(functionName); hit &&
		cachedResponse.AvailableReplicas > 0 {
		return
	}

	// 查询 function 的信息
	queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)

	// 增加到缓存中
	f.Cache.Set(functionName, queryResponse)

	// 如果可用副本数量为 0 则增大副本数
	if queryResponse.AvailableReplicas == 0 {

		// 多次执行伸缩,直到 function 增加到指定数量
		scaleResult := backoff(func(attempt int) error {
			queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
			f.Cache.Set(functionName, queryResponse)
			setScaleErr := f.Config.ServiceQuery.SetReplicas(functionName, minReplicas)
			return nil

		}, int(f.Config.SetScaleRetries), f.Config.FunctionPollInterval)

		if scaleResult != nil {
			return
		}

		// 再执行 1000 次???直到伸缩成功
		for i := 0; i < int(f.Config.MaxPollCount); i++ {
			queryResponse, err := f.Config.ServiceQuery.GetReplicas(functionName)
			if err == nil {
				f.Cache.Set(functionName, queryResponse)
			}
			if err != nil {
				return
			}

			if queryResponse.AvailableReplicas > 0 {
				return	
			}
		}
	}

	return
}

type routine func(attempt int) error

// 多次尝试执行,直到执行成功或者尝试次数用完
func backoff(r routine, attempts int, interval time.Duration) error {
	for i := 0; i < attempts; i++ {
		res := r(i)
	}
	return err
}

其他功能

ListFunctions

ListFunctions 路由不仅会从 Provider 读取信息,还会通过 HTTP API 调用 Prometheus,获得调用次数等信息,根据 AddMetricsHandler 源码可知

  • 调用 Provider 接口返回基本信息
  • 通过 Prometheus HTTP API 查询 function 的基本信息,例如调用次数
  • 整合两个信息,输出
faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
prometheusQuery := metrics.NewPrometheusQuery(config.PrometheusHost, config.PrometheusPort, &http.Client{})
faasHandlers.ListFunctions = metrics.AddMetricsHandler(faasHandlers.ListFunctions, prometheusQuery)

func AddMetricsHandler(handler http.HandlerFunc, prometheusQuery PrometheusQueryFetcher) http.HandlerFunc {

	return func(w http.ResponseWriter, r *http.Request) {

		// 创建新的 http.ResponseWriter,用于捕获执行 handler 的响应
		recorder := httptest.NewRecorder()
		handler.ServeHTTP(recorder, r)
		upstreamCall := recorder.Result()

		// 读取 handler 的响应,并解析
		upstreamBody, _ := ioutil.ReadAll(upstreamCall.Body)
		var functions []types.FunctionStatus
		err := json.Unmarshal(upstreamBody, &functions)

		// 通过调用 Prometheus HTTP API 查询 function 信息
		expr := url.QueryEscape(`sum(gateway_function_invocation_total{function_name=~".*", code=~".*"}) by (function_name, code)`)
		results, fetchErr := prometheusQuery.Fetch(expr)
		
		// 整合两个查询的信息
		mixIn(&functions, results)

		// 输出
		bytesOut, marshalErr := json.Marshal(functions)
		w.Write(bytesOut)
	}
}

// 把 Prometheus 查询到的调用此处整合到 function 信息中
func mixIn(functions *[]types.FunctionStatus, metrics *VectorQueryResponse) {
	for i, function := range *functions {
		for _, v := range metrics.Data.Result {
			if v.Metric.FunctionName == function.Name {
					(*functions)[i].InvocationCount += f
				}
			}
		}
	}
}

为 function 调用增加追踪 ID

faasHandlers.Proxy = handlers.MakeCallIDMiddleware(faasHandlers.Proxy)
func MakeCallIDMiddleware(next http.HandlerFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		start := time.Now()
		if len(r.Header.Get("X-Call-Id")) == 0 {
			callID := uuid.Generate().String()
			r.Header.Add("X-Call-Id", callID)
			w.Header().Add("X-Call-Id", callID)
		}

		r.Header.Add("X-Start-Time", fmt.Sprintf("%d", start.UTC().UnixNano()))
		w.Header().Add("X-Start-Time", fmt.Sprintf("%d", start.UTC().UnixNano()))

		next(w, r)
	}
}

总结

OpenFaas Gateway 耦合度很低,源码还是比较简单的。

用到了很多 func (next http.HandlerFunc) http.HandlerFunc {return func(w http.ResponseWriter, r *http.Request) {}} 中间件的方法。

其中统一 Client 需要深入研究,NATS 也是一个新的知识点

Gateway 自身的 Basic Auth 还需要深入探索一下