Rescaling
Apply the following configuration to enable adaptive rescaling with checkpointing.
Adaptive mode
jobmanager:
image: here-anonymizer:latest
environment:
# ...
ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.19.3.jar"
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
jobmanager.scheduler: adaptive
state.backend: filesystem
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoint.dir: file:///flink-data/savepoints
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60s
execution.checkpointing.timeout: 30s
execution.checkpointing.max-concurrent: 1
parallelism.default: 4
s3a.endpoint: http://minio:9000
s3a.path.style.access: true
s3a.access-key: minioadmin
s3a.secret-key: minioadmin
s3a.endpoint.region: eu-west-1
fs.s3a.endpoint: minio:9000
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: minioadmin
fs.s3a.secret.key: minioadmin
fs.s3a.endpoint.region: eu-west-1
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
fs.s3a.signing-algorithm: S3SignerType
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
volumes:
- ../../../simple-anonymization.conf:/opt/anonymization.conf
- ./log4j-debug.properties:/opt/flink/conf/log4j-console.properties
taskmanager:
image: here-anonymizer:latest
# ...
scale: 2
environment:
# ...
ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.19.3.jar"
FLINK_PROPERTIES: |
state.backend: filesystem
state.checkpoints.dir: s3a://flink-data/checkpoints
s3a.endpoint: http://minio:9000
s3a.path.style.access: true
s3a.access-key: minioadmin
s3a.secret-key: minioadmin
s3a.endpoint.region: eu-west-1
fs.s3a.endpoint: minio:9000
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: minioadmin
fs.s3a.secret.key: minioadmin
fs.s3a.endpoint.region: eu-west-1
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
fs.s3a.signing-algorithm: S3SignerType
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
minio:
image: docker.artifactory-collab.tool.in.here.com/minio/minio
ports:
- "9001:9000" # use API: http://127.0.0.1:9001 (port 9000 used by ZScaler)
- "9002:9002" # UI console
environment:
MINIO_ROOT_USER: minioadmin # at least 3 character required
MINIO_ROOT_PASSWORD: minioadmin # at least 8 characters required
MINIO_SITE_REGION: eu-west-1
command: server /data --console-address ":9002"
# initialize minio with areas.json
init-minio:
image: quay.io/minio/minio
depends_on:
minio:
condition: service_started
entrypoint: [ '/bin/bash', '-x', '-c' ]
command:
- |
until mc alias set myminio http://minio:9000 minioadmin minioadmin; do sleep 1; done
mc mb myminio/flink-data/checkpoints
mc mb myminio/flink-data/savepoints
Example
-
From the application directory, build the
here-anonymizer:latestimage:docker build -t here-anonymizer:latest . -
Run the example:
sh demo-start.sh -
Inspect the Flink console at http://localhost:8081:
- Check the
Running Jobs -> HERE Anonymizerand verify that the second block of operators is described asParallelism: 2. - Check
TaskManagersand verify if you can see three TaskManagers.
- Check the
-
Upscaling the TaskManager deployment to three replicas (with one task slot per TaskManager) results in the parallelism increasing to three. Use the following command:
docker-compose -f ./deployments/docker/demo/docker-compose.yml \ scale taskmanager=3 -
Downscaling the TaskManager deployment to two replicas (with one task slot per TaskManager) results in the parallelism decreasing to two. Use the following command:
docker-compose -f ./deployments/docker/demo/docker-compose.yml \ scale taskmanager=2 -
Run this command to stop the example:
sh demo-stop.sh
Updated 22 days ago