Browse Source

Add new helmchart

vkropotko 5 years ago
parent
commit
307c04e593

+ 5 - 0
Chart.yaml

@@ -0,0 +1,5 @@
+apiVersion: v1
+appVersion: "1.9.1"
+description: Chart for Apache Flink
+name: flink-helmchart
+version: 0.1.0

+ 6 - 0
OWNERS

@@ -0,0 +1,6 @@
+approvers:
+- vkropotko@riskfocus.com
+- rfvermut@riskfocus.com
+reviewers:
+- vkropotko@riskfocus.com
+- rfvermut@riskfocus.com

+ 77 - 1
README.md

@@ -1 +1,77 @@
-# flink-helmchart
+# Apache Flink Helm Chart
+
+This is an implementation of https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
+
+## Pre Requisites:
+
+* Kubernetes 1.3 with alpha APIs enabled and support for storage classes
+
+* PV support on underlying infrastructure
+
+* Requires at least `v2.0.0-beta.1` version of helm to support
+  dependency management with requirements.yaml
+
+## StatefulSet Details
+
+* https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/
+
+## StatefulSet Caveats
+
+* https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#limitations
+
+## Chart Details
+
+This chart will do the following:
+
+* Implement a dynamically scalable Flink(Jobmanagers and Taskmanagers) cluster using Kubernetes StatefulSets
+
+* Implement a dynamically scalable zookeeper cluster as another Kubernetes StatefulSet required for the Flink cluster above
+
+### Installing the Chart
+
+To install the chart with the release name `my-flink` in the default
+namespace:
+
+```
+$ git clone https://github.com/riskfocus/flink-helmchart
+$ helm install --name my-flink ./flink-helmchart
+```
+
+If using a dedicated namespace(recommended) then make sure the namespace
+exists with:
+
+```
+$ git clone https://github.com/riskfocus/flink-helmchart
+$ kubectl create ns flink
+$ helm install --name my-flink --namespace flink ./flink-helmchart
+```
+
+This chart can includes a ZooKeeper chart as a dependency to the Flink
+cluster Jobmanagers HA mode in its `requirement.yaml`. The chart can be customized using the
+following configurable parameters:
+
+| Parameter                                | Description                                                                                                                                                              | Default                |
+|------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------|
+| `image.repository`                       | Flink Container image name                                                                                                                                               | `flink`                |
+| `image.tag`                              | Flink Container image tag                                                                                                                                                | `1.9.1-scala_2.12`     |
+| `image.PullPolicy`                       | Flink Containers pull policy                                                                                                                                             | `IfNotPresent`         |
+| `flink.monitoring.enabled`               | Enable flink monitoring                                                                                                                                                  | `true`                 |
+| `jobmanager.highAvailability.enabled`    | Enabled jobmanager HA mode key                                                                                                                                           | `false`                |
+| `jobmanager.highAvailability.storageDir` | storageDir for Jobmanager in HA mode                                                                                                                                     | `null`                 |
+| `jobmanager.replicaCount`                | Jobmanagers count context                                                                                                                                                | `1`                    |
+| `jobmanager.heapSize`                    | Jobmanager HeapSize options                                                                                                                                             | `1g`                   |
+| `jobmanager.resources`                   | Jobmanager resources                                                                                                                                                     | `{}`                 |
+| `taskmanager.resources`    | Taskmanager Resources key                                                                                                                                           | `{}`                |
+| `taskmanager.heapSize` | Taskmanager heapSize mode                                                                                                                                     | `1g`                 |
+| `jobmanager.replicaCount`                | Taskmanager count context                                                                                                                                                | `1`                    |
+| `taskmanager.numberOfTaskSlots`                   | Number of Taskmanager taskSlots resources                                                                                                                                                     | `1`                 |
+| `taskmanager.resources`                   | Taskmanager resources                                                                                                                                                     | `{}`                 |
+| `zookeeper.enabled`                      | If True, installs Zookeeper Chart                                                                                                                                        | `false`                 |
+| `zookeeper.resources`                    | Zookeeper resource requests and limits                                                                                                                                   | `{}`                   |
+| `zookeeper.env`                          | Environmental variables provided to Zookeeper Zookeeper                                                                                                                  | `{ZK_HEAP_SIZE: "1G"}` |
+| `zookeeper.storage`                      | Zookeeper Persistent volume size                                                                                                                                         | `2Gi`                  |
+| `zookeeper.image.PullPolicy`             | Zookeeper Container pull policy                                                                                                                                          | `IfNotPresent`         |
+| `zookeeper.url`                          | URL of Zookeeper Cluster (unneeded if installing Zookeeper Chart)                                                                                                        | `""`                   |
+| `zookeeper.port`                         | Port of Zookeeper Cluster                                                                                                                                                | `2181`                 |
+| `zookeeper.affinity`                     | Defines affinities and anti-affinities for pods as defined in: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity preferences | `{}`                   |
+| `zookeeper.nodeSelector`                 | Node labels for pod assignment                                                                                                                                           | `{}`                   |

BIN
charts/zookeeper-2.0.0.tgz


+ 0 - 22
flink/.helmignore

@@ -1,22 +0,0 @@
-# Patterns to ignore when building packages.
-# This supports shell glob matching, relative path matching, and
-# negation (prefixed with !). Only one pattern per line.
-.DS_Store
-# Common VCS dirs
-.git/
-.gitignore
-.bzr/
-.bzrignore
-.hg/
-.hgignore
-.svn/
-# Common backup files
-*.swp
-*.bak
-*.tmp
-*~
-# Various IDEs
-.project
-.idea/
-*.tmproj
-.vscode/

+ 0 - 5
flink/Chart.yaml

@@ -1,5 +0,0 @@
-apiVersion: v1
-appVersion: "1.0"
-description: A Helm chart for Kubernetes
-name: flink
-version: 0.1.4

+ 0 - 111
flink/values.yaml

@@ -1,111 +0,0 @@
-# Default values for flink.
-# This is a YAML-formatted file.
-# Declare variables to be passed into your templates.
-
-nameOverride: ""
-fullnameOverride: ""
-
-image:
-  repository: flink
-  tag: 1.9.1-scala_2.12
-  pullPolicy: IfNotPresent
-imagePullSecrets: []
-
-flink:
-  monitoring:
-    enabled: true
-    latency:
-      enabled: false
-      probingInterval: 1000
-    system:
-      enabled: true
-      probingInterval: 5000
-  workDir: /opt/flink
-
-extraEnvs: {}
-
-jobmanager:
-  extraEnvs: {}
-  ports:
-    rpc: 6123
-    blob: 6124
-    ui: 8081
-    metrics: 9999
-  replicaCount: 1
-  # g for Gigabytes, m for Megabytes
-  heapSize: 6g
-  resources:
-    limits:
-      cpu: 3800m
-      # Gi for Gigabytes, Mi for Megabytes
-      memory: 8000Mi
-  command: ["/bin/bash", "-c", "wget \
-  https://repo1.maven.org/maven2/org/apache/flink/flink-metrics-prometheus_2.12/1.9.1/flink-metrics-prometheus_2.12-1.9.1.jar \
-  -O /opt/flink/lib/flink-metrics-prometheus_2.12-1.9.1.jar && \
-  wget https://repo1.maven.org/maven2/com/github/oshi/oshi-core/3.4.0/oshi-core-3.4.0.jar \
-  -O /opt/flink/lib/oshi-core-3.4.0.jar && \
-  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.4.0/jna-5.4.0.jar \
-  -O /opt/flink/lib/jna-5.4.0.jar && \
-  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.4.0/jna-platform-5.4.0.jar \
-  -O /opt/flink/lib/jna-platform-5.4.0.jar && \
-  $FLINK_HOME/bin/jobmanager.sh start;\
-      while :;
-      do
-        if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
-          then tail -f -n +1 log/*jobmanager*.log;
-        fi;
-      done"]
-  service:
-    type: ClusterIP
-    annotations: {}
-  rest:
-    enabled: true
-
-taskmanager:
-  extraEnvs: {}
-  ports:
-    rpc: 6122
-    metrics: 9999
-  replicaCount: 4
-  numberOfTaskSlots: 9
-  # g for Gigabytes, m for Megabytes
-  heapSize: 6g
-  resources:
-    limits:
-      cpu: 3800m
-      # Gi for Gigabytes, Mi for Megabytes
-      memory: 8000Mi
-  command: ["/bin/bash", "-c", "wget \
-  https://repo1.maven.org/maven2/org/apache/flink/flink-metrics-prometheus_2.12/1.9.1/flink-metrics-prometheus_2.12-1.9.1.jar \
-  -O /opt/flink/lib/flink-metrics-prometheus_2.12-1.9.1.jar && \
-  wget https://repo1.maven.org/maven2/com/github/oshi/oshi-core/3.4.0/oshi-core-3.4.0.jar \
-  -O /opt/flink/lib/oshi-core-3.4.0.jar && \
-  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.4.0/jna-5.4.0.jar \
-  -O /opt/flink/lib/jna-5.4.0.jar && \
-  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.4.0/jna-platform-5.4.0.jar \
-  -O /opt/flink/lib/jna-platform-5.4.0.jar && \
-  $FLINK_HOME/bin/taskmanager.sh start; \
-      while :;
-      do
-        if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
-          then tail -f -n +1 log/*taskmanager*.log;
-        fi;
-      done"]
-  service:
-    type: ClusterIP
-
-ingress:
-  enabled: false
-  annotations: {}
-  path: /
-  hosts: []
-  tls: []
-
-nodeSelector: {}
-
-tolerations:
-  - key: instance
-    operator: Exists
-    effect: NoSchedule
-
-affinity: {}

+ 6 - 0
requirements.lock

@@ -0,0 +1,6 @@
+dependencies:
+- name: zookeeper
+  repository: https://kubernetes-charts-incubator.storage.googleapis.com
+  version: 2.0.0
+digest: sha256:4cf31599082c88db5aa67becd405fadaf609af55690331a02622330ed94da91b
+generated: "2019-12-04T19:24:45.82797+02:00"

+ 6 - 0
requirements.yaml

@@ -0,0 +1,6 @@
+dependencies:
+  - name:       "zookeeper"
+    version:    "2.0.0"
+    repository: "https://kubernetes-charts-incubator.storage.googleapis.com"
+    condition:  "zookeeper.enabled"
+

+ 0 - 0
flink/templates/NOTES.txt → templates/NOTES.txt


+ 26 - 0
flink/templates/_helpers.tpl → templates/_helpers.tpl

@@ -43,3 +43,29 @@ app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
 {{- end }}
 app.kubernetes.io/managed-by: {{ .Release.Service }}
 {{- end -}}
+
+{{/*
+Check if specific namespace is passed if false
+then .Release.Namespace will be used
+*/}}
+{{- define "serviceMonitor.namespace" -}}
+{{- if .Values.prometheus.serviceMonitor.namespace -}}
+{{ .Values.prometheus.serviceMonitor.namespace }}
+{{- else -}}
+{{ .Release.Namespace }}
+{{- end -}}
+{{- end -}}
+
+{{/*
+ServiceAccount for Jobmanager
+*/}}
+{{- define "jobmanager.serviceAccount" -}}
+{{ default "jobmanager" .Values.jobmanager.serviceAccount.name }}
+{{- end -}}
+
+{{/*
+ServiceAccount for Taskmanager
+*/}}
+{{- define "taskmanager.serviceAccount" -}}
+{{ default "taskmanager" .Values.taskmanager.serviceAccount.name }}
+{{- end -}}

+ 28 - 6
flink/templates/configmap.yaml → templates/configmap.yaml

@@ -8,15 +8,15 @@ data:
   flink-conf.yaml: |+
     jobmanager.rpc.address: flink-jobmanager
     taskmanager.numberOfTaskSlots: {{ .Values.taskmanager.numberOfTaskSlots }}
-    blob.server.port: 6124
-    jobmanager.rpc.port: 6123
-    taskmanager.rpc.port: 6122
+    blob.server.port: {{ .Values.jobmanager.ports.blob }}
+    jobmanager.rpc.port: {{ .Values.jobmanager.ports.rpc }}
+    taskmanager.rpc.port: {{ .Values.taskmanager.ports.rpc }}
     jobmanager.heap.size: {{ .Values.jobmanager.heapSize }}
     taskmanager.heap.size: {{ .Values.taskmanager.heapSize }}
     {{- if .Values.flink.monitoring.enabled }}
     metrics.reporters: prom
     metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
-    metrics.reporter.prom.port: 9999
+    metrics.reporter.prom.port: {{ .Values.flink.monitoring.port }}
       {{- if .Values.flink.monitoring.system.enabled }}
     metrics.system-resource: true
     metrics.system-resource-probing-interval: {{ .Values.flink.monitoring.system.probingInterval }}
@@ -25,10 +25,26 @@ data:
     metrics.latency.interval: {{ .Values.flink.monitoring.latency.probingInterval }}
       {{- end }}
     {{- end }}
+    {{- if .Values.flink.state.backend }}
+    state.backend: {{ .Values.flink.state.backend }}
+    {{- .Values.flink.state.params | nindent 4 }}
+      {{- if eq .Values.flink.state.backend "rocksdb" }}
+    {{- .Values.flink.state.rocksdb | nindent 4 }}
+      {{- end }}
+    {{- end }}
+    {{- if .Values.jobmanager.highAvailability.enabled }}
+    high-availability: zookeeper
+    high-availability.zookeeper.quorum: {{ .Values.jobmanager.highAvailability.zookeeperConnect }}
+    high-availability.zookeeper.path.root: /flink
+    high-availability.cluster-id: /flink
+    high-availability.storageDir: {{ .Values.jobmanager.highAvailability.storageDir }}
+    {{- end }}
   log4j.properties: |+
     log4j.rootLogger=INFO, file
-    log4j.logger.akka=INFO
-    log4j.logger.org.apache.kafka=INFO
+    log4j.logger.akka=WARN
+    log4j.logger.org.apache.kafka=WARN
+    log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=WARN
+    log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=WARN
     log4j.logger.org.apache.hadoop=INFO
     log4j.logger.org.apache.zookeeper=INFO
     log4j.appender.file=org.apache.log4j.FileAppender
@@ -36,3 +52,9 @@ data:
     log4j.appender.file.layout=org.apache.log4j.PatternLayout
     log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
     log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
+{{- if .Values.jobmanager.highAvailability.enabled }}
+  masters: |
+  {{- range $i, $e := until (.Values.jobmanager.replicaCount | int) }}
+    {{ include "flink.fullname" $ }}-jobmanager-{{ $i }}.flink-jobmanager-headless.{{ $.Release.Namespace }}.svc:{{ $.Values.jobmanager.ports.ui }}
+  {{- end }}
+{{- end }}

+ 0 - 0
flink/templates/ingress.yaml → templates/ingress.yaml


+ 30 - 0
templates/jobmanager-headless-service.yaml

@@ -0,0 +1,30 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: flink-jobmanager-headless
+  labels:
+{{ include "flink.labels" . | indent 4 }}
+    component: jobmanager
+{{- if .Values.jobmanager.service.headless.annotations }}
+  annotations:
+{{ toYaml .Values.jobmanager.service.headless.annotations | indent 4 }}
+{{- end }}
+spec:
+  clusterIP: None
+  ports:
+    {{- range $name, $port := .Values.jobmanager.ports }}
+    - port: {{ $port }}
+      targetPort: {{ $name }}
+      protocol: TCP
+      name: {{ $name }}
+    {{- end }}
+    {{- if .Values.flink.monitoring.enabled }}
+    - port: {{ .Values.flink.monitoring.port }}
+      targetPort: metrics
+      protocol: TCP
+      name: metrics
+    {{- end }}
+  selector:
+    app.kubernetes.io/name: {{ include "flink.name" . }}
+    app.kubernetes.io/instance: {{ .Release.Name }}
+    component: jobmanager

+ 4 - 4
flink/templates/service-jobmanager-rest.yaml → templates/jobmanager-rest-service.yaml

@@ -1,14 +1,14 @@
-{{ if .Values.jobmanager.rest.enabled }}
+{{ if .Values.jobmanager.service.rest.enabled }}
 apiVersion: v1
 kind: Service
 metadata:
-  name: flink-jobmanager-ui
+  name: {{ include "flink.fullname" . }}-jobmanager-rest
   labels:
 {{ include "flink.labels" . | indent 4 }}
     component: jobmanager
-{{- if .Values.jobmanager.service.annotations }}
+{{- if .Values.jobmanager.service.rest.annotations }}
   annotations:
-{{ toYaml .Values.jobmanager.service.annotations | indent 4 }}
+{{ toYaml .Values.jobmanager.service.rest.annotations | indent 4 }}
 {{- end }}
 spec:
   type: ClusterIP

+ 5 - 1
flink/templates/service-jobmanager.yaml → templates/jobmanager-service.yaml

@@ -1,10 +1,14 @@
 apiVersion: v1
 kind: Service
 metadata:
-  name: flink-jobmanager
+  name: {{ include "flink.fullname" . }}-jobmanager
   labels:
 {{ include "flink.labels" . | indent 4 }}
     component: jobmanager
+{{- if .Values.jobmanager.service.annotations }}
+  annotations:
+{{ toYaml .Values.jobmanager.service.annotations | indent 4 }}
+{{- end }}
 spec:
   type: {{ .Values.jobmanager.service.type }}
   ports:

+ 42 - 25
flink/templates/statefulset-jobmanager.yaml → templates/jobmanager-statefulset.yaml

@@ -6,10 +6,12 @@ metadata:
 {{ include "flink.labels" . | indent 4 }}
     component: jobmanager
   annotations:
-    "cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
+  {{- range $key, $value := .Values.jobmanager.annotations }}
+    {{ $key | quote }}: {{ $value | quote }}
+  {{- end }}
 spec:
   replicas: {{ .Values.jobmanager.replicaCount }}
-  podManagementPolicy: Parallel
+  podManagementPolicy: {{ .Values.jobmanager.podManagementPolicy }}
   selector:
     matchLabels:
       app.kubernetes.io/name: {{ include "flink.name" . }}
@@ -26,48 +28,44 @@ spec:
       imagePullSecrets:
         {{- toYaml . | nindent 8 }}
     {{- end }}
+      serviceAccount: {{ include "jobmanager.serviceAccount" . }}
       containers:
         - name: jobmanager
           image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           workingDir: {{ .Values.flink.workDir }}
           command:
-          {{- range .Values.jobmanager.command }}
+          {{- with .Values.jobmanager.command }}
             - {{ . | quote }}
           {{- end }}
           env:
-          # Redis access
-            - name: REDIS_HOST
-              value: {{ .Release.Namespace }}-redis-master
-            - name: REDIS_PORT
-              value: "6379"
-            - name: REDIS_PASSWORD
-              valueFrom:
-                secretKeyRef:
-                  name: {{ .Release.Namespace }}-redis
-                  key: redis-password
-          #
-          {{- range $key, $value := .Values.jobmanager.extraEnvs }}
-            - name: {{ $key }}
-              value: {{ $value }}
+          {{- if .Values.extraEnvs }}
+          {{- toYaml .Values.extraEnvs | nindent 12 }}
           {{- end }}
-          {{- range $key, $value := .Values.extraEnvs }}
-            - name: {{ $key }}
-              value: {{ $value }}
+          {{- if .Values.jobmanager.extraEnvs }}
+          {{- toYaml .Values.jobmanager.extraEnvs | nindent 12 }}
           {{- end }}
           ports:
           {{- range $name, $port := .Values.jobmanager.ports }}
             - containerPort: {{ $port }}
               name: {{ $name }}
           {{- end }}
+          {{- if .Values.flink.monitoring.enabled }}
+            - containerPort: {{ .Values.flink.monitoring.port }}
+              name: metrics
+          {{- end }}
           livenessProbe:
             tcpSocket:
               port: {{ .Values.jobmanager.ports.rpc }}
-            initialDelaySeconds: 30
-            periodSeconds: 60
+            initialDelaySeconds: {{ .Values.jobmanager.livenessProbe.initialDelaySeconds }}
+            periodSeconds: {{ .Values.jobmanager.livenessProbe.periodSeconds }}
           volumeMounts:
             - name: flink-config-volume
               mountPath: {{ .Values.flink.workDir }}/conf
+          {{- if .Values.jobmanager.persistent.enabled }}
+            - name: jobmanager-data
+              mountPath: {{ .Values.jobmanager.persistent.mountPath }}
+          {{- end }}
           resources:
             {{- toYaml .Values.jobmanager.resources | nindent 12 }}
 
@@ -80,16 +78,35 @@ spec:
                 path: flink-conf.yaml
               - key: log4j.properties
                 path: log4j.properties
+              {{- if .Values.jobmanager.highAvailability.enabled }}
+              - key: masters
+                path: masters
+              {{- end }}
 
-      {{- with .Values.nodeSelector }}
+      {{- with .Values.jobmanager.nodeSelector }}
       nodeSelector:
         {{- toYaml . | nindent 8 }}
       {{- end }}
-    {{- with .Values.affinity }}
+    {{- with .Values.jobmanager.affinity }}
       affinity:
         {{- toYaml . | nindent 8 }}
     {{- end }}
-    {{- with .Values.tolerations }}
+    {{- with .Values.jobmanager.tolerations }}
       tolerations:
         {{- toYaml . | nindent 8 }}
     {{- end }}
+{{- if .Values.jobmanager.persistent.enabled }}
+  volumeClaimTemplates:
+    - metadata:
+        name: jobmanager-data
+        annotations:
+          volume.alpha.kubernetes.io/storage-class: {{ .Values.jobmanager.persistent.storageClass }}
+        labels:
+{{ include "flink.labels" . | indent 10 }}
+      spec:
+        accessModes:
+          - ReadWriteOnce
+        resources:
+          requests:
+            storage: {{ .Values.jobmanager.persistent.size }}
+{{- end }}

+ 18 - 0
templates/serviceaccounts.yaml

@@ -0,0 +1,18 @@
+{{- if .Values.jobmanager.serviceAccount.create }}
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: {{ include "jobmanager.serviceAccount" . }}
+  labels:
+{{ include "flink.labels" . | indent 4 }}
+{{- end }}
+---
+
+{{- if .Values.taskmanager.serviceAccount.create }}
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: {{ include "taskmanager.serviceAccount" . }}
+  labels:
+{{ include "flink.labels" . | indent 4 }}
+{{- end }}

+ 10 - 8
flink/templates/prometheus-service-monitors.yaml → templates/servicemonitors.yaml

@@ -1,3 +1,4 @@
+{{- if .Values.prometheus.serviceMonitor.enabled }}
 ---
 apiVersion: monitoring.coreos.com/v1
 kind: ServiceMonitor
@@ -5,11 +6,11 @@ metadata:
   name: 'flink-jobmanager'
   labels:
 {{ include "flink.labels" . | indent 4 }}
-    app: prometheus
-    prometheusApp: prometheus-local
+{{ toYaml .Values.prometheus.serviceMonitor.selector | indent 4 }}
+  namespace: {{ include "serviceMonitor.namespace" . }}
 spec:
   endpoints:
-    - interval: 5s
+    - interval: {{ .Values.prometheus.serviceMonitor.interval }}
       path: /
       port: metrics
   namespaceSelector:
@@ -20,7 +21,7 @@ spec:
     - component
   selector:
     matchLabels:
-      app.kubernetes.io/name: flink
+      app.kubernetes.io/name: {{ include "flink.name" . }}
       component: jobmanager
 
 ---
@@ -30,11 +31,11 @@ metadata:
   name: 'flink-taskmanager'
   labels:
 {{ include "flink.labels" . | indent 4 }}
-    app: prometheus
-    prometheusApp: prometheus-local
+{{ toYaml .Values.prometheus.serviceMonitor.selector | indent 4 }}
+  namespace: {{ include "serviceMonitor.namespace" . }}
 spec:
   endpoints:
-    - interval: 5s
+    - interval: {{ .Values.prometheus.serviceMonitor.interval }}
       path: /
       port: metrics
   namespaceSelector:
@@ -45,5 +46,6 @@ spec:
     - component
   selector:
     matchLabels:
-      app.kubernetes.io/name: flink
+      app.kubernetes.io/name: {{ include "flink.name" . }}
       component: taskmanager
+{{- end }}

+ 11 - 1
flink/templates/service-taskmanager.yaml → templates/taskmanager-service.yaml

@@ -1,10 +1,14 @@
 apiVersion: v1
 kind: Service
 metadata:
-  name: flink-taskmanager
+  name: {{ include "flink.fullname" . }}-taskmanager
   labels:
 {{ include "flink.labels" . | indent 4 }}
     component: taskmanager
+{{- if .Values.taskmanager.service.annotations }}
+  annotations:
+{{ toYaml .Values.taskmanager.service.annotations | indent 4 }}
+{{- end }}
 spec:
   type: {{ .Values.taskmanager.service.type }}
   ports:
@@ -14,6 +18,12 @@ spec:
       protocol: TCP
       name: {{ $name }}
     {{- end }}
+    {{- if .Values.flink.monitoring.enabled }}
+    - port: {{ .Values.flink.monitoring.port }}
+      targetPort: metrics
+      protocol: TCP
+      name: metrics
+    {{- end }}
   selector:
     app.kubernetes.io/name: {{ include "flink.name" . }}
     app.kubernetes.io/instance: {{ .Release.Name }}

+ 41 - 24
flink/templates/statefulset-taskmanager.yaml → templates/taskmanager-statefulset.yaml

@@ -6,10 +6,12 @@ metadata:
 {{ include "flink.labels" . | indent 4 }}
     component: taskmanager
   annotations:
-    "cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
+  {{- range $key, $value := .Values.taskmanager.annotations }}
+    {{ $key | quote }}: {{ $value | quote }}
+  {{- end }}
 spec:
   replicas: {{ .Values.taskmanager.replicaCount }}
-  podManagementPolicy: Parallel
+  podManagementPolicy: {{ .Values.taskmanager.podManagementPolicy }}
   selector:
     matchLabels:
       app.kubernetes.io/name: {{ include "flink.name" . }}
@@ -26,6 +28,7 @@ spec:
       imagePullSecrets:
         {{- toYaml . | nindent 8 }}
     {{- end }}
+      serviceAccount: {{ include "taskmanager.serviceAccount" . }}
       containers:
         - name: taskmanager
           image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
@@ -36,38 +39,33 @@ spec:
             - {{ . | quote }}
           {{- end }}
           env:
-          # Redis access
-            - name: REDIS_HOST
-              value: {{ .Release.Namespace }}-redis-master
-            - name: REDIS_PORT
-              value: "6379"
-            - name: REDIS_PASSWORD
-              valueFrom:
-                secretKeyRef:
-                  name: {{ .Release.Namespace }}-redis
-                  key: redis-password
-          #
-          {{- range $key, $value := .Values.taskmanager.extraEnvs }}
-            - name: {{ $key }}
-              value: {{ $value }}
+          {{- if .Values.extraEnvs }}
+          {{- toYaml .Values.extraEnvs | nindent 12 }}
           {{- end }}
-          {{- range $key, $value := .Values.extraEnvs }}
-            - name: {{ $key }}
-              value: {{ $value }}
+          {{- if .Values.jobmanager.extraEnvs }}
+          {{- toYaml .Values.taskmanager.extraEnvs | nindent 12 }}
           {{- end }}
           ports:
           {{- range $name, $port := .Values.taskmanager.ports }}
             - containerPort: {{ $port }}
               name: {{ $name }}
           {{- end }}
+          {{- if .Values.flink.monitoring.enabled }}
+            - containerPort: {{ .Values.flink.monitoring.port }}
+              name: metrics
+          {{- end }}
           livenessProbe:
             tcpSocket:
               port: {{ .Values.taskmanager.ports.rpc }}
-            initialDelaySeconds: 30
-            periodSeconds: 60
+            initialDelaySeconds: {{ .Values.taskmanager.livenessProbe.initialDelaySeconds }}
+            periodSeconds: {{ .Values.taskmanager.livenessProbe.periodSeconds }}
           volumeMounts:
             - name: flink-config-volume
               mountPath: {{ .Values.flink.workDir }}/conf
+          {{- if .Values.taskmanager.persistent.enabled }}
+            - name: taskmanager-data
+              mountPath: {{ .Values.taskmanager.persistent.mountPath }}
+          {{- end }}
           resources:
             {{- toYaml .Values.taskmanager.resources | nindent 12 }}
 
@@ -80,16 +78,35 @@ spec:
                 path: flink-conf.yaml
               - key: log4j.properties
                 path: log4j.properties
+              {{- if .Values.jobmanager.highAvailability.enabled }}
+              - key: masters
+                path: masters
+              {{- end }}
 
-      {{- with .Values.nodeSelector }}
+      {{- with .Values.taskmanager.nodeSelector }}
       nodeSelector:
         {{- toYaml . | nindent 8 }}
       {{- end }}
-    {{- with .Values.affinity }}
+    {{- with .Values.taskmanager.affinity }}
       affinity:
         {{- toYaml . | nindent 8 }}
     {{- end }}
-    {{- with .Values.tolerations }}
+    {{- with .Values.taskmanager.tolerations }}
       tolerations:
         {{- toYaml . | nindent 8 }}
     {{- end }}
+{{- if .Values.taskmanager.persistent.enabled }}
+  volumeClaimTemplates:
+    - metadata:
+        name: taskmanager-data
+        annotations:
+          volume.alpha.kubernetes.io/storage-class: {{ .Values.taskmanager.persistent.storageClass }}
+        labels:
+{{ include "flink.labels" . | indent 10 }}
+      spec:
+        accessModes:
+          - ReadWriteOnce
+        resources:
+          requests:
+            storage: {{ .Values.taskmanager.persistent.size }}
+{{- end }}

+ 219 - 0
values.yaml

@@ -0,0 +1,219 @@
+# Default values for flink.
+# This is a YAML-formatted file.
+# Declare variables to be passed into your templates.
+
+nameOverride: ""
+fullnameOverride: ""
+
+image:
+  repository: flink
+  tag: 1.9.1-scala_2.12
+  pullPolicy: IfNotPresent
+imagePullSecrets: []
+
+# For general configuration
+flink:
+  # monitoring is exporting metrics in Prometheus format
+  monitoring:
+    enabled: true
+    # port for metrics
+    port: 9999
+    # latency monitoring
+    latency:
+      enabled: false
+      probingInterval: 1000
+    # system is additional system metrics
+    system:
+      enabled: true
+      probingInterval: 5000
+  workDir: /opt/flink
+  state:
+    # backend for state. Available options: filesystem, rocksdb, memory; empty - for default(memory)
+    backend:
+    # These values are default excludes file pathes
+    # https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#related-config-options
+    params: |+
+      state.checkpoints.dir: file:///flink_state/checkpoints
+      state.savepoints.dir: file:///flink_state/savepoints
+      state.backend.async: true
+      state.backend.fs.memory-threshold: 1024
+      state.backend.fs.write-buffer-size: 4096
+      state.backend.incremental: false
+      state.backend.local-recovery: false
+      state.checkpoints.num-retained: 1
+      taskmanager.state.local.root-dirs: file:///flink_state/local-recovery
+    # https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
+    # * state.backend.rocksdb.localdir doesn't have a prefix - file://
+    rocksdb: |+
+      state.backend.rocksdb.checkpoint.transfer.thread.num: 1
+      state.backend.rocksdb.localdir: /flink_state/rocksdb
+      state.backend.rocksdb.options-factory: org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
+      state.backend.rocksdb.predefined-options: DEFAULT
+      state.backend.rocksdb.timer-service.factory: HEAP
+      state.backend.rocksdb.ttl.compaction.filter.enabled: false
+
+# extraEnvs passes envs to both Jobmanagers and Taskmanager
+# for example
+# extraEnvs:
+#  - name: KAFKA_BOOTSTRAP_SERVERS
+#    value: dest-kafka-bootstrap:9092
+#
+extraEnvs: []
+
+jobmanager:
+  # highAvailability configuration based on zookeeper
+  highAvailability:
+    # enabled also will enable zookeeper Dependency
+    enabled: false
+    zookeeperConnect: zookeeper:2181
+    # storageDir for Jobmanagers. DFS expected.
+    # Docs - Storage directory (required): JobManager metadata is persisted in the file system storageDir and only a pointer to this state is stored in ZooKeeper
+    storageDir:
+  # extraEnvs passes envs to Jobmanagers
+  extraEnvs: []
+  ports:
+    rpc: 6123
+    blob: 6124
+    ui: 8081
+  replicaCount: 1
+  # heapSize params for Jobmanager
+  # keep in mind that Flink can use offheap memory
+  # e.g. in case of checkpoint usage
+  heapSize: 1g
+  resources: {}
+# Example
+#    limits:
+#      cpu: 3800m
+#      memory: 8000Mi
+  command: ["/bin/bash", "-c", "cp \
+  /opt/flink/opt/flink-metrics-prometheus-1.9.1.jar \
+  /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar \
+  /opt/flink/lib/ && \
+  wget https://repo1.maven.org/maven2/com/github/oshi/oshi-core/3.4.0/oshi-core-3.4.0.jar \
+  -O /opt/flink/lib/oshi-core-3.4.0.jar && \
+  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.4.0/jna-5.4.0.jar \
+  -O /opt/flink/lib/jna-5.4.0.jar && \
+  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.4.0/jna-platform-5.4.0.jar \
+  -O /opt/flink/lib/jna-platform-5.4.0.jar && \
+  $FLINK_HOME/bin/jobmanager.sh start;\
+      while :;
+      do
+        if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
+          then tail -f -n +1 log/*jobmanager*.log;
+        fi;
+      done"]
+  service:
+    type: ClusterIP
+    annotations: {}
+    # rest is additional service which exposes only HTTP port
+    # can be using for cases of using exposeController
+    rest:
+      enabled: true
+      annotations: {}
+    headless:
+      annotations: {}
+  nodeSelector: {}
+  affinity: {}
+  tolerations: {}
+  persistent:
+    enabled: false
+    storageClass:
+    size: 8Gi
+    mountPath: "/flink_state"
+  podManagementPolicy: Parallel
+  annotations: {}
+  # Example
+  #  "cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
+  serviceAccount:
+    # Specifies whether a ServiceAccount should be created
+    create: true
+    # The name of the ServiceAccount to use.
+    # If not set and create is true, a name is generated using the fullname template
+    name:
+  #livenessProbe will conduct checks for rpc port as tcpSocket probe
+  livenessProbe:
+    initialDelaySeconds: 30
+    periodSeconds: 60
+
+taskmanager:
+  # extraEnvs passes envs to Taskmanagers
+  extraEnvs: []
+  ports:
+    rpc: 6122
+  replicaCount: 4
+  numberOfTaskSlots: 1
+  heapSize: 1g
+  resources: {}
+# Example
+#    limits:
+#      cpu: 3800m
+#      memory: 8000Mi
+  command: ["/bin/bash", "-c", "cp \
+  /opt/flink/opt/flink-metrics-prometheus-1.9.1.jar \
+  /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar \
+  /opt/flink/lib/ && \
+  wget https://repo1.maven.org/maven2/com/github/oshi/oshi-core/3.4.0/oshi-core-3.4.0.jar \
+  -O /opt/flink/lib/oshi-core-3.4.0.jar && \
+  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna/5.4.0/jna-5.4.0.jar \
+  -O /opt/flink/lib/jna-5.4.0.jar && \
+  wget https://repo1.maven.org/maven2/net/java/dev/jna/jna-platform/5.4.0/jna-platform-5.4.0.jar \
+  -O /opt/flink/lib/jna-platform-5.4.0.jar && \
+  $FLINK_HOME/bin/taskmanager.sh start; \
+      while :;
+      do
+        if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
+          then tail -f -n +1 log/*taskmanager*.log;
+        fi;
+      done"]
+  service:
+    type: ClusterIP
+  nodeSelector: {}
+  affinity: {}
+  tolerations: {}
+  persistent:
+    enabled: false
+    storageClass:
+    size: 8Gi
+    mountPath: "/flink_state"
+  podManagementPolicy: Parallel
+  annotations:
+    "cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
+  serviceAccount:
+    # Specifies whether a ServiceAccount should be created
+    create: true
+    # The name of the ServiceAccount to use.
+    # If not set and create is true, a name is generated using the fullname template
+    name:
+  #livenessProbe will conduct checks for rpc port as tcpSocket probe
+  livenessProbe:
+    initialDelaySeconds: 30
+    periodSeconds: 60
+
+ingress:
+  enabled: false
+  annotations: {}
+  path: /
+  hosts: []
+  tls: []
+
+prometheus:
+  # serviceMonitor provides service discovery for prometheus operatored installations
+  serviceMonitor:
+    enabled: true
+    namespace:
+    interval: 5s
+    selector:
+      # According to default selector for prometheus operator
+      prometheus: kube-prometheus
+
+zookeeper:
+  enabled: false
+  replicaCount: 3
+  env:
+    ZK_HEAP_SIZE: "1G"
+  resources:
+    limits:
+      cpu: 400m
+      memory: 1256Mi
+  persistence:
+    enabled: true