ant31
ant31

Reputation: 4780

Kubernetes API call equivalent to 'kubectl apply'

I try to use the master api to update resources.

In 1.2 to update a deployment resource I'm doing kubectl apply -f new updateddeployment.yaml

How to do the same action with the api?

Upvotes: 32

Views: 14673

Answers (4)

mecampbellsoup
mecampbellsoup

Reputation: 1511

You could install the kubectl binary and invoke it from within your Python program a la:

exec(f"kubectl apply -f - <<EOF{yaml_manifests}EOF --prune")

Once server-side apply is ready this problem should get a bit easier as there will effectively be a k8s API endpoint you can hit (though still doesn't sound like it will be resource-agnostic, i.e. you will still have to PATCH /api/v1/some-k8s-resource specifically, whereas with kubectl apply you can input some heterogenous list of resources).


UPDATE

k8s has implemented server-side apply which is stable as of version 1.22.

Upvotes: 4

Reece Jones
Reece Jones

Reputation: 21

2023 Solution w/ Server-Side Apply

In newer versions of Kubernetes you can use Server-Side Apply. This method has the benefit of only requiring a single API request and letting the server merge the resource.

Here's an example API request:

PATCH /apis/apps/v1/namespaces/default/deployments/nginx-deployment?fieldManager=stackOverflow HTTP/1.1
Content-Type: application/apply-patch+yaml
Authorization: Bearer <API TOKEN>
Host: <API IP>:<API PORT>
Connection: close
User-Agent: RapidAPI/4.1.4 (Macintosh; OS X/13.2.1) GCDHTTPRequest
Content-Length: 340

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80

You can make kubectl use SSA from the command line by supplying the --server-side argument.

$ kubectl apply -f test.yml --v=8 --server-side

You can read more here.

Upvotes: 1

Ichthyo
Ichthyo

Reputation: 8369

This API is not really convincingly designed, since it forces us to reimplement such basic stuff at the client side...

Anyway, here is my attempt to reinvent the hexagonal wheel in Python...

Python module kube_apply

Usage is like kube_apply.fromYaml(myStuff)

  • can read strings or opened file streams (via lib Yaml)
  • handles yaml files with several concatenated objects
  • implementation is rather braindead and first attempts to insert the resource. If this fails, it tries a patch, and if this also fails, it deletes the resource and inserts it anew.

File: kube_apply.py

#!/usr/bin/python3
# coding: utf-8
# __________   ________________________________________________   #
# kube_apply - apply Yaml similar to kubectl apply -f file.yaml   #
#                                                                 #
# (C) 2019 Hermann Vosseler <[email protected]>                 #
# This is OpenSource software; licensed under Apache License v2+  #
# ############################################################### #
'''
Utility for the official Kubernetes python client: apply Yaml data.
While still limited to some degree, this utility attempts to provide
functionality similar to `kubectl apply -f`
- load and parse Yaml
- try to figure out the object type and API to use
- figure out if the resource already exists, in which case
  it needs to be patched or replaced alltogether.
- otherwise just create a new resource.

Based on inspiration from `kubernetes/utils/create_from_yaml.py`

@since: 2/2019
@author: Ichthyostega
'''


import re
import yaml
import logging

import kubernetes.client



def runUsageExample():
    ''' demonstrate usage by creating a simple Pod through default client
    '''
    logging.basicConfig(level=logging.DEBUG)
    #
#   KUBECONFIG = '/path/to/special/kubecfg.yaml'
#   import kubernetes.config
#   client = kubernetes.config.new_client_from_config(config_file=KUBECONFIG)
#   # --or alternatively--
#   kubernetes.config.load_kube_config(config_file=KUBECONFIG)


    fromYaml('''
kind: Pod
apiVersion: v1
metadata:
  name: dummy-pod
  labels:
    blow: job
spec:
  containers:
  - name: sleepr
    image: busybox
    command:
    - /bin/sh
    - -c
    - sleep 24000
''')



def fromYaml(rawData, client=None, **kwargs):
    ''' invoke the K8s API to create or replace an object given as YAML spec.
        @param rawData: either a string or an opened input stream with a
                        YAML formatted spec, as you'd use for `kubectl apply -f`
        @param client: (optional) preconfigured client environment to use for invocation
        @param kwargs: (optional) further arguments to pass to the create/replace call
        @return: response object from Kubernetes API call
    '''
    for obj in yaml.load_all(rawData):
        createOrUpdateOrReplace(obj, client, **kwargs)


def createOrUpdateOrReplace(obj, client=None, **kwargs):
    ''' invoke the K8s API to create or replace a kubernetes object.
        The first attempt is to create(insert) this object; when this is rejected because
        of an existing object with same name, we attempt to patch this existing object.
        As a last resort, if even the patch is rejected, we *delete* the existing object
        and recreate from scratch.
        @param obj: complete object specification, including API version and metadata.
        @param client: (optional) preconfigured client environment to use for invocation
        @param kwargs: (optional) further arguments to pass to the create/replace call
        @return: response object from Kubernetes API call
    '''
    k8sApi = findK8sApi(obj, client)
    try:
        res = invokeApi(k8sApi, 'create', obj, **kwargs)
        logging.debug('K8s: %s created -> uid=%s', describe(obj), res.metadata.uid)
    except kubernetes.client.rest.ApiException as apiEx:
        if apiEx.reason != 'Conflict': raise
        try:
            # asking for forgiveness...
            res = invokeApi(k8sApi, 'patch', obj, **kwargs)
            logging.debug('K8s: %s PATCHED -> uid=%s', describe(obj), res.metadata.uid)
        except kubernetes.client.rest.ApiException as apiEx:
            if apiEx.reason != 'Unprocessable Entity': raise
            try:
                # second attempt... delete the existing object and re-insert
                logging.debug('K8s: replacing %s FAILED. Attempting deletion and recreation...', describe(obj))
                res = invokeApi(k8sApi, 'delete', obj, **kwargs)
                logging.debug('K8s: %s DELETED...', describe(obj))
                res = invokeApi(k8sApi, 'create', obj, **kwargs)
                logging.debug('K8s: %s CREATED -> uid=%s', describe(obj), res.metadata.uid)
            except Exception as ex:
                message = 'K8s: FAILURE updating %s. Exception: %s' % (describe(obj), ex)
                logging.error(message)
                raise RuntimeError(message)
    return res


def patchObject(obj, client=None, **kwargs):
    k8sApi = findK8sApi(obj, client)
    try:
        res = invokeApi(k8sApi, 'patch', obj, **kwargs)
        logging.debug('K8s: %s PATCHED -> uid=%s', describe(obj), res.metadata.uid)
        return res
    except kubernetes.client.rest.ApiException as apiEx:
        if apiEx.reason == 'Unprocessable Entity':
            message = 'K8s: patch for %s rejected. Exception: %s' % (describe(obj), apiEx)
            logging.error(message)
            raise RuntimeError(message)
        else:
            raise


def deleteObject(obj, client=None, **kwargs):
    k8sApi = findK8sApi(obj, client)
    try:
        res = invokeApi(k8sApi, 'delete', obj, **kwargs)
        logging.debug('K8s: %s DELETED. uid was: %s', describe(obj), res.details and res.details.uid or '?')
        return True
    except kubernetes.client.rest.ApiException as apiEx:
        if apiEx.reason == 'Not Found':
            logging.warning('K8s: %s does not exist (anymore).', describe(obj))
            return False
        else:
            message = 'K8s: deleting %s FAILED. Exception: %s' % (describe(obj), apiEx)
            logging.error(message)
            raise RuntimeError(message)



def findK8sApi(obj, client=None):
    ''' Investigate the object spec and lookup the corresponding API object
        @param client: (optional) preconfigured client environment to use for invocation
        @return: a client instance wired to the apriopriate API
    '''
    grp, _, ver = obj['apiVersion'].partition('/')
    if ver == '':
        ver = grp
        grp = 'core'
    # Strip 'k8s.io', camel-case-join dot separated parts. rbac.authorization.k8s.io -> RbacAuthorzation
    grp = ''.join(part.capitalize() for part in grp.rsplit('.k8s.io', 1)[0].split('.'))
    ver = ver.capitalize()

    k8sApi = '%s%sApi' % (grp, ver)
    return getattr(kubernetes.client, k8sApi)(client)


def invokeApi(k8sApi, action, obj, **args):
    ''' find a suitalbe function and perform the actual API invocation.
        @param k8sApi: client object for the invocation, wired to correct API version
        @param action: either 'create' (to inject a new objet) or 'replace','patch','delete'
        @param obj: the full object spec to be passed into the API invocation
        @param args: (optional) extraneous arguments to pass
        @return: response object from Kubernetes API call
    '''
    # transform ActionType from Yaml into action_type for swagger API
    kind = camel2snake(obj['kind'])
    # determine namespace to place the object in, supply default
    try: namespace = obj['metadata']['namespace']
    except: namespace = 'default'

    functionName = '%s_%s' %(action,kind)
    if hasattr(k8sApi, functionName):
        # namespace agnostic API
        function = getattr(k8sApi, functionName)
    else:
        functionName = '%s_namespaced_%s' %(action,kind)
        function = getattr(k8sApi, functionName)
        args['namespace'] = namespace
    if not 'create' in functionName:
        args['name'] = obj['metadata']['name']
    if 'delete' in functionName:
        from kubernetes.client.models.v1_delete_options import V1DeleteOptions
        obj = V1DeleteOptions()

    return function(body=obj, **args)


def describe(obj):
    return "%s '%s'" % (obj['kind'], obj['metadata']['name'])

def camel2snake(string):
    string = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', string)
    string = re.sub('([a-z0-9])([A-Z])', r'\1_\2', string).lower()
    return string


if __name__=='__main__':
    runUsageExample()

Upvotes: 9

zhb
zhb

Reputation: 687

I checked the code in pkg/kubectl/cmd/apply.go and I think the following lines of code shows what's behind the scene when you run kubectl apply -f:

// Compute a three way strategic merge patch to send to server.
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, 
    versionedObject, true)
helper := resource.NewHelper(info.Client, info.Mapping)
_, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch)

And here is the code helper.Patch:

func (m *Helper) Patch(namespace, name string, pt api.PatchType, data []byte) (runtime.Object, error) {
    return m.RESTClient.Patch(pt).
        NamespaceIfScoped(namespace, m.NamespaceScoped).
        Resource(m.Resource).
        Name(name).
        Body(data).
        Do().
        Get()
}

Upvotes: 10

Related Questions