KEP-3715: Elastic Indexed Job

Implementation History
STABLE Implemented
Created 2023-01-10
Latest v1.31
Milestones
Beta 1.27
Stable v1.31
Ownership
Owning SIG
SIG Apps
Primary Authors

KEP-3715: Elastic Indexed Job

Release Signoff Checklist

Items marked with (R) are required prior to targeting to a milestone / release.

  • (R) Enhancement issue in release milestone, which links to KEP dir in kubernetes/enhancements (not the initial KEP PR)
  • (R) KEP approvers have approved the KEP status as implementable
  • (R) Design details are appropriately documented
  • (R) Test plan is in place, giving consideration to SIG Architecture and SIG Testing input (including test refactors)
    • e2e Tests for all Beta API Operations (endpoints)
    • (R) Ensure GA e2e tests meet requirements for Conformance Tests
    • (R) Minimum Two Week Window for GA e2e tests to prove flake free
  • (R) Graduation criteria is in place
  • (R) Production readiness review completed
  • (R) Production readiness review approved
  • “Implementation History” section is up-to-date for milestone
  • User-facing documentation has been created in kubernetes/website , for publication to kubernetes.io
  • Supporting documentation—e.g., additional design documents, links to mailing list discussions/SIG meetings, relevant PRs/issues, release notes

Summary

Currently spec.completions is an immutable field in Job for both Indexed and NonIndexed modes. This KEP proposes to allow mutating spec.completions for Indexed job iff spec.completions equals to spec.parallelism before and after the update, meaning that spec.completions is mutable only in tandem with spec.parallelism.

Motivation

There are cases where a batch workload requires an autoscaled indexed job with stable DNS pod names, for example MPI Horovord, Ray, PyTorch etc. This is currently not possible because spec.completions, which controls the range of indexes in Indexed job, is immutable. While such workloads could be modeled as a StatefulSet, the Job API is a better fit because it offers batch-specific features such as allowing indexes to run to completion, and pod and container failure handling .

In the above mentioned cases, there is no distinction between the concept of “how many completed indexes do I need” (i.e., spec.completions) and the concept of “how many pods do I want running at a time” (i.e., spec.parallelism). All indexes are expected to start at the same time, but they can finish at different times.

The above behavior can be modeled using an Indexed job with

  • spec.completions equal to spec.parallelism
  • mutable spec.completions and spec.parallelsim as long as they are mutated together.

Goals

  • Allow mutating spec.completions for Indexed job iff the updated value is equal to spec.parallelism.

Non-Goals

  • Change the existing behavior of NonIndexed mode.
  • Change the existing behavior of Indexed mode for jobs that never mutate .spec.completions and .spec.parallelism together.

Proposal

The proposal is to allow mutating spec.completions for Indexed Job when equal to and updated in tandem with spec.parallelsim.

Success and failure semantics are not changed for jobs that never mutate spec.completions; however, for ones that do, the semantics are changed in the following manner:

Failures will always count. If spec.completions was scaled down, then previous pod failures outside the new range will still count against the job’s backoffLimit. Morever, the field status.Failed will not be decremented if previously failed pods are now outside the range. However, status.Succeeded will be updated to reflect the number of successful indexes within the range as defined below.

In the case where an index successfully completes, and then the job is scaled down rendering the index out of range, and then the job is scaled up again including back the index in the range, then the index will be restarted.

User Stories (Optional)

Story 1

I have a batch workload that requires two sets of pods: a manager and a group of workers that can scale up or down. The manager communicates with the workers to manage their load; examples of such workloads include distributed HPC and districuted ML training. Examples of frameworks include Horovod with MPI, Spark and Ray.

This workload can be modeled as two Indexed Jobs, one for the manager and one for the workers. A headless service is created to set up stable hostnames for the worker pods. The workers job is scaled up/down by updating spec.completions and spec.parallelism in tandem.

The success semantics of these workloads are tied to the manager, not the workers. However, in such workloads, when the job is finished the workers are either deleted directly (effectively scaled down to 0) or they potentially exit on their own after communicating their final status to the manager who decides the final status of the whole workload.

Risks and Mitigations

There is a concern that changing completion semantics on the fly can be subject to race conditions with declaring the job finished. However, after a closer look, this seems to not be an issue because the job controller uses Update when setting the finished status on the job object, which is subject to object modified errors if the spec changes at the same time. Moreover, spec.completions will be immutable once a job has a finished condition.

Design Details

The controller already counts the number of completed indexes within the range defined by spec.completions and uses that to evaluate if the job completed successfully and to set the status.Succeeded count. This is not impacted if spec.completions changes over time. Morever, on scale down, the controller also automatically removes any indexes outside the range.

As mentioned above, pod failures may have happened to indexes that are outside the range after spec.completions is updated. Those failures will still count against the backoffLimit of the job and the field status.Failed will not be decremented.

The only needed change is to modify Job update validation in the kube-apiserver to allow updating an Indexed Job’s spec.completions if spec.completions=spec.parallelism in both the old and new instance of the Job object.

Test Plan

[x] I/we understand the owners of the involved components may require updates to existing tests to make this code solid enough prior to committing the changes necessary to implement this enhancement.

Prerequisite testing updates
Unit tests
  • k8s.io/kubernetes/pkg/controller/job: 1/10/2023 - 90%
  • k8s.io/kubernetes/pkg/apis/batch/validation: 1/10/2023 - 96%
Integration tests

For an Indexed Job test that:

  • spec.completions is only mutable if equal to spec.parallelism
  • When scaled down, higher indexes are removed first.
  • When scaled down to zero, the job is marked as finished successfully.
  • When scaled up, the range is extended and new higher indexes are created.
  • Success, failure and completion semantics are as discussed above.
e2e tests

Coverage is achieved via integration tests. But we will add one e2e test:

  • Job is allowed to scale down, then up, then successfully finish when all pods in the new range exit with success

Graduation Criteria

Since the change is only relaxing validation that doesn’t require multiple releases and requires no changes to the job controller, we will start the feature in beta and enabled by default since we will not get a lot of value from an alpha release. Just like we successfully did for mutable scheduling directives .

Note that if tests reveals a required change that invalidates the above understanding, then we will revert and start in Alpha.

Beta

  • Validation logic in place to allow mutating spec.completions in tandem with .spec.parallelism.

GA

  • Fix any potentially reported bugs.

Upgrade / Downgrade Strategy

N/A, when the feature is enabled, mutating completions will be allowed, when disabled (via a downgrade or the feature flag) mutating completions will be rejected.

Version Skew Strategy

In a multi-master setup, when the cluster has skewed apiservers, some create requests may get accepted and some may get rejected. This is a transient state until all instances are on the same k8s version.

Production Readiness Review Questionnaire

Feature Enablement and Rollback

The feature can be safely rolled back.

How can this feature be enabled / disabled in a live cluster?
  • Feature gate (also fill in values in kep.yaml)
    • Feature gate name: ElasticIndexedJob
    • Components depending on the feature gate: kube-apiserver
  • Other
    • Describe the mechanism:
    • Will enabling / disabling the feature require downtime of the control plane?
    • Will enabling / disabling the feature require downtime or reprovisioning of a node?
Does enabling the feature change any default behavior?

No.

Can the feature be disabled once it has been enabled (i.e. can we roll back the enablement)?

Yes. If disabled, kube-apiserver will reject mutating requests to spec.completions for Indexed jobs. For Jobs that were previously mutated, then the only implication is that future mutating requests will be rejected.

What happens if we reenable the feature if it was previously rolled back?

The api-server will accept mutation requests to spec.completions for Indexed job.

Are there any tests for feature enablement/disablement?

See Job integration tests

Rollout, Upgrade and Rollback Planning

How can a rollout or rollback fail? Can it impact already running workloads?

The change is opt-in, it doesn’t impact already running workloads.

What specific metrics should inform a rollback?

N/A

Were upgrade and rollback tested? Was the upgrade->downgrade->upgrade path tested?

It was tested manually prior to beta launch, but also given the feature effectively is only relaxing validation (so is purely in-memory), upgrade/downgrade boils down to actual feature tests.

Is the rollout accompanied by any deprecations and/or removals of features, APIs, fields of API types, flags, etc.?

No.

Monitoring Requirements

How can an operator determine if the feature is in use by workloads?

Attempt to mutate spec.completions for Indexed job.

How can someone using this feature know that it is working for their instance?
  • Events
    • Event Reason:
  • API .status
    • Condition name:
    • Other field:
  • Other (treat as last resort)
    • Details: spec.completions for Indexed jobs are successfully mutated according to the semantics described above.
What are the reasonable SLOs (Service Level Objectives) for the enhancement?

N/A

What are the SLIs (Service Level Indicators) an operator can use to determine the health of the service?
  • Metrics
    • Metric name: apiserver_request_total[resource=job, group=batch, verb=UPDATE, code=400]
    • [Optional] Aggregation method:
    • Components exposing the metric:
  • Other (treat as last resort)
    • Details:
Are there any missing metrics that would be useful to have to improve observability of this feature?

Dependencies

Does this feature depend on any specific services running in the cluster?

No.

Scalability

Will enabling / using this feature result in any new API calls?

Not on of itself.

Will enabling / using this feature result in introducing new API types?

No.

Will enabling / using this feature result in any new calls to the cloud provider?

No.

Will enabling / using this feature result in increasing size or count of the existing API objects?

No.

Will enabling / using this feature result in increasing time taken by any operations covered by existing SLIs/SLOs?

No.

Will enabling / using this feature result in non-negligible increase of resource usage (CPU, RAM, disk, IO, …) in any components?

No.

Troubleshooting

How does this feature react if the API server and/or etcd is unavailable?

Create requests will be rejected.

What are other known failure modes?

In a multi-master setup, when the cluster has skewed apiservers, some create requests may get accepted and some may get rejected.

Detection: failed update requests; metric that an operator can monitor is apiserver_request_total[resource=job, group=batch, verb=UPDATE, code=403] Diagnostics: apiserver logs indicating rejected/accepted job update requests Testing: no testing, this is a transient state until all instances are on the same k8s version.

What steps should be taken if SLOs are not being met to determine the problem?

N/A.

Implementation History

  • 2024-06-12: Graduated to stable.
  • 2023-01-10: Proposed KEP.

Alternatives

Use StatefulSets

Jobs have different success and retry semantics compared to StatefulSet that makes it better suited for batch workloads.

Make spec.completions fully mutable

Making spec.completions fully mutable has no clear use case, and it is in theory a larger scope change. Scaling a job is not related to how many pods I want completed, it is more about how many pods I want active, and so tying it to parallelism makes sense.

Allow spec.completions=nil for Indexed mode

spec.completions=nil is a special case that currently allowed only for NonIndexed mode. The success and completions semantics where designed to address the work queue pattern ; specifically, a job is marked as successful if at least one pod exits successfully, and all remaining pods complete and the number of failures do not exceed spec.backoffLimit. Currently spec.completions=nil is not allowed for Indexed mode, and so an alternative is to allow it and use spec.parallelism to define the range of indexes. However, this has the following drawbacks:

  • the success semantics are not compatible with the use cases we are targeting
  • falling back to spec.parallelism to define the range of indexes changes how job completion criteria works for Indexed job.
  • Relaxing validation to allow for a nil value may not be a safe change to API clients that expected completions to not be nil.

While we could change the success criteria to work better for our use case, and we could do the change under a new spec.completionMode to ensure that the change is safe to clients, making spec.completions mutable seems like the clearer API.