Elasticsearch+Fluent

Elasticsearch+Fluent

   夏克东     2023年11月25日 14:16     809    

是用于Linux、macOS、Windows和BSD系列操作系统的日志、度量和跟踪的快速轻量级遥测代理。它非常注重性能,可以在不复杂的情况下收集和处理来自不同来源的遥测数据。

按照服务名区分索引名

日志是能收集了,但是所有的服务的日志,当天都放到一个索引,类似  “brazil-logs-<yyyy.mm.dd>”  这样的索引中。本地还好,如果生产环境的话,那一天单个索引就十分庞大了。而且对于不同服务日志保留天数做差异化保留的时候也不好处理,也不好直观展示各个服务日志大小。

所以能按照brazil-<service-name>-<yyyy.mm.dd>格式的话,对我们会更为理想点。截止目前的版本,没有现成的配置可以实现这个。经过研究,可以利用fluentbit的 logstash_prefix_key 这个来实现。logstash_prefix是固定的前缀,logstash_prefix_key则可以动态读取 key来作为索引名。

1.修改fluent-operator,让其支持logstash_prefix_key

经过测试,fluent-operator当前版本的模板并没有对该字段进行处理,所以要进行修改让它支持这个字段。

我需要output到es,所以我就修改了es相关的output,其它输出源的可以自己检查下。

修改 ./templates/fluentbit-output-elasticsearch.yaml

在Values.fluentbit.output.es.logstashPrefix之后增加 .Values.fluentbit.output.es.logstashPrefixKey 这段的配置,如下:

{{- if .Values.fluentbit.output.es.logstashPrefix }}
    logstashPrefix: {{ .Values.fluentbit.output.es.logstashPrefix | default "ks-logstash-log" | quote }}
{{- end }}
{{- if .Values.fluentbit.output.es.logstashPrefixKey }}
    logstashPrefixKey: {{ .Values.fluentbit.output.es.logstashPrefixKey | default "ks-logstash-log-key" | quote }}
{{- end }}

2.让它能获取app name作为服务名

./values.yaml 修改fluentbit部分,开启kubernetes labels,因为labels的app标签就是我们的服务名。

filter:
  kubernetes:
    enable: true
    labels: true

3. 拼接索引名

由于logstash_prefix_key只能接受 key,且不支持嵌套的对象的key。

举个直观的例子,假设你的对象是这样的:

{
  "file": "systemd.log",
  "kubernetes": {
     "labels": {
        "app": "demo-service"
     }
  }
}

像 brazil-$kubernets["labels"]["app"] 和 像 $kubernets["labels"]["app"]  这样的值,是取不到任何内容的。像上面的例子,只接受 $file 这个值。

所以这里要实现我们的目标,我们得自己拼接出一个新的key,作为索引名。类似下面:

{
  "file": "systemd.log",
  "app_name": "brazil-demo-service",
  "kubernetes": {
     "labels": {
        "app": "demo-service"
     }
  }
}

我是这样实现的,利用lua的filter,通过取出kubernetes的labels,拼接新的值。

我修改的是 cat ./templates/fluentbit-containerd-config.yaml 增加了一个 add_k8s_app_name_field 函数

{{- if .Values.Kubernetes -}}
{{- if .Values.fluentbit.enable -}}
{{- if .Values.fluentbit.filter.containerd.enable -}}
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-containerd-config
data:
  containerd.lua: |
    function containerd( tag, timestamp, record)
           if(record["logtag"]~=nil)
           then
           timeStr = os.date("!*t",  timestamp["sec"])
            t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ",
            timeStr["year"], timeStr["month"], timeStr["day"],
            timeStr["hour"], timeStr["min"], timeStr["sec"],
            timestamp["nsec"]);
            record["time"] = t;
            record["log"] = record["message"];
            record["message"] =  nil;
            return 1, timestamp, record
            else
            return 0,timestamp,record
           end
    end

    function add_k8s_app_name_field(tag, timestamp, record)
        retcode = 0
        prefix = 'brazil-prod' 
        app_name = record['kubernetes']['labels']['app']
        if  app_name ~= nil then
            app_name = prefix .. '-' .. app_name
            
            if app_name ~= nil then
                record['app_name'] = app_name
                retcode = 2
            end
        end
        
        return retcode, timestamp, record
    end
{{- end }}
{{- end }}
{{- end }}

修改 templates/fluentbit-clusterfilter-kubernetes.yaml 增加新增的lua filter函数 对kubernetes标签进行处理

cat templates/fluentbit-clusterfilter-kubernetes.yaml

{{- if .Values.Kubernetes -}}
{{- if .Values.fluentbit.enable -}}
{{- if .Values.fluentbit.filter.kubernetes.enable -}}
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
  name: kubernetes
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/component: logging
spec:
  match: kube.*
  filters:
  - kubernetes:
      kubeURL: https://kubernetes.default.svc:443
      kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
      {{- $params := omit .Values.fluentbit.filter.kubernetes "enable" }}
      {{- if .Values.fluentbit.output.stdout.enable }}
      {{- $_ := set $params "k8sLoggingExclude" true -}}
      {{- end }}
      {{- with $params }}
      {{- . | toYaml | nindent 6 }}
      {{- end }}
  - lua:
      script:
        key: containerd.lua
        name: fluent-bit-containerd-config
      call: add_k8s_app_name_field
      timeAsTable: true
  - nest:
      operation: lift
      nestedUnder: kubernetes
      addPrefix: kubernetes_
  - modify:
      rules:
      - remove: stream
      - remove: kubernetes_pod_id
      - remove: kubernetes_docker_id
      - remove: kubernetes_container_hash
      - remove: kubernetes_labels
  - nest:
      operation: nest
      wildcard:
      - kubernetes_*
      nestUnder: kubernetes
      removePrefix: kubernetes_
{{- end }}
{{- end }}
{{- end }}

主要增加了

- lua:
    script:
      key: containerd.lua
      name: fluent-bit-containerd-config
    call: add_k8s_app_name_field
    timeAsTable: true

到这里基本就满足所有的条件了,为了不影响systemd的日志收集和归类,也给它的lua filter增加app_name字段.

./templates/fluentbit-lua-config.yaml

new_record["app_name"] = "systemd"

经过前面的修改后,只要在values.yaml 设置 fluentbit.output.es.logstashPrefixKey=”$app_name” 即可

4. 应用变更

我修改过的values.yaml,去除了无关部分后主要如下:

Kubernetes: true
fluentbit:
  crdsEnable: true
  enable: true
  image:
    repository: "hub.xxxx.com/library/fluent-bit"
    tag: "v2.0.11"
  input:
    tail:
      enable: true
      refreshIntervalSeconds: 10
      memBufLimit: 50MB
      path: "/var/log/containers/*.log"
      skipLongLines: false
    systemd:
      enable: true
      path: "/var/log/journal"
      includeKubelet: true
  output:
    es:
      enable: true
      # 如果多个host的话,用hosts
      host: "es.xxx.local"
      port: 9200
      logstashFormat: true
      logstashPrefixKey: "$app_name"
  filter:
    kubernetes:
      enable: true
      labels: true
      annotations: false
      k8sLoggingExclude: true
    containerd:
      enable: true
    systemd:
      enable: true

再来helm更新一下,搞定

helm upgrade --install fluent-operator --create-namespace -n fluent /app/fluent-operator/ --set containerRuntime=containerd

5.其他配置参数详解

Input

[Input]
    Name    systemd     //使用systemd输入插件从systemd或journaled读取日志
    Path    /var/log/journal  //采集k8s未运行在容器中的集群组件日志: Docker
    DB    /fluent-bit/tail/docker.db
    DB.Sync    Normal
    Tag    service.docker   //定义Tag用于识别数据源
    Systemd_Filter    _SYSTEMD_UNIT=docker.service  //采集当前节点docker服务日志(_SYSTEM_UNIT必须加下划线) 
[Input]
    Name    systemd
    Path    /var/log/journal  //采集k8s未运行在容器中的集群组件日志: Kubelet
    DB    /fluent-bit/tail/kubelet.db
    DB.Sync    Normal
    Tag    service.kubelet
    Systemd_Filter    _SYSTEMD_UNIT=kubelet.service  //采集当前节点kubelet服务日志 
[Input]
    Name    tail   //使用tail输入插件
    Path    /var/log/containers/*.log    //采集k8s应用Pod日志和运行在容器中的集群组件日志(Kubernetes scheduler 和 kube-proxy、etcd等)
    Exclude_Path    /var/log/containers/*_cloudbases-logging-system_events-exporter*.log,/var/log/containers/kube-auditing-webhook*_cloudbases-logging-system_kube-auditing-webhook*.log    //使用通配符排除日志文件采集
    Refresh_Interval    10   
    Skip_Long_Lines    true   
    DB    /fluent-bit/tail/pos.db
    DB.Sync    Normal
    Mem_Buf_Limit    5MB      
    Parser    docker          
    Tag    kube.*

过滤——Filter

使用kubernetes过滤器插件为应用容器日志和运行在容器中的k8s集群组件日志添加kubernetes元数据

[Filter]  
    Name    kubernetes
    Match    kube.*   //匹配输入模块中的Tag,即匹配上文中的使用tail插件的那个input模块
    Kube_URL    https://kubernetes.default.svc:443
    Kube_CA_File    /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File    /var/run/secrets/kubernetes.io/serviceaccount/token
    Labels    false   //不将标签添加到容器日志中
    Annotations    false    //不将注解添加到容器日志中

经过kubernetes过滤器插件后,此时容器的日志格式为:

使用nest过滤器插件对应用容器和运行在容器中的k8s集群组件的嵌套日志进行操作

通过modify调整应用容器和运行在容器中的k8s集群组件日志字段

[Filter]
    Name    modify
    Match    kube.*
    Remove    stream  //移除stream字段
    Remove    kubernetes_pod_id  //移除kubernetes_pod_id字段
    Remove    kubernetes_host    //移除kubernetes_host字段
    Remove    kubernetes_container_hash   //移除kubernetes_container_hash字段

经过nest过滤器插件后,此时容器的日志格式为:

经过以上4个过滤器插件后,将应用容器日志和运行在容器中的k8s集群组件日志过滤和修改成了想要的格式,当以上配置不满足公司业务需求时,对应调整过滤器模块配置即可。

Output

将数据发送到不同的目的地。

 

[Output]
    Name    es  //输出插件使用es
    Match_Regex    (?:kube|service)\.(.*)   //在输出模块配置中指定 Match 规则,Match输入模块中的Tag,这样通过标签和匹配规则就能将数据路由到一个或多个目的地
    Host    elasticsearch-logging-data.logging-system.svc  //es地址
    Port    9200  //es端口
    Logstash_Format    true 
    Logstash_Prefix    brazil-prod-*
    Logstash_Prefix_Key brazil-prod-gwm....access.log
    Time_Key    @timestamp
    Generate_ID    true

Over


文章评论

2

其他文章