Instana Blog

Date: July 2, 2019

Writing a Kubernetes Operator in Java: Part 3

Category: Engineering

Part 3: Writing a Kubernetes Operator in Java

In the previous post of this series, we created an example Quarkus project using the Kubernetes client extension. The example retrieved a list of Pods and logged the list to the console.

In this post, we will extend this example with functionality that you typically find in operators.

Functionality of the Example Operator

At Instana, we recently published the first alpha version of our Instana Agent Operator. The main task of the operator is to create a DaemonSet such that one Instana agent is running on each Kubernetes node.

However, there are special roles that need to be present only once in the cluster. The operator takes care that exactly one Pod from the daemon set gets assigned the special role.

In this Blog post, we will implement a stripped-down version of this functionality. We create a custom resource defining some special roles, and we will implement an operator creating a daemon set when that resource is created.

Custom Resource Model

Our example resource looks like this:

apiVersion: instana.com/v1alpha1
kind: Example
metadata:
  name: example
spec:
  specialRoles:
  - A
  - B
  - C

Reading the custom resource is simple, because the Kubernetes client will automatically deserialize the resource to a Java object. All we need to do is to define a model class for the spec using Jackson annotations, and some boilerplate classes.

Our model class models the specialRoles as a Set of String:

package com.instana.operator.example.cr;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.quarkus.runtime.annotations.RegisterForReflection;

import java.util.Arrays;
import java.util.Set;

@JsonDeserialize
@RegisterForReflection
public class ExampleResourceSpec {

  @JsonProperty("specialRoles")
  private Set<String> specialRoles;

  public Set<String> getSpecialRoles() {
    return specialRoles;
  }

  @Override
  public String toString() {
    return "specialRoles=" + Arrays.toString(specialRoles.toArray(new String[] {}));
  }
}

Apart from the standard Jackson annotations, this class is annotated with @RegisterForReflection. This is needed when building native executables. It tells the compiler that instances of this class are created at runtime and the compiler must not optimize away the constructor.

Next, there are three classes of boiler plate code needed to use the custom resource in the fabric8 Kubernetes client:

Class ExampleResource:

package com.instana.operator.example.cr;

import io.fabric8.kubernetes.client.CustomResource;

public class ExampleResource extends CustomResource {

  private ExampleResourceSpec spec;

  public ExampleResourceSpec getSpec() {
    return spec;
  }

  public void setSpec(ExampleResourceSpec spec) {
    this.spec = spec;
  }

  @Override
  public String toString() {
    String name = getMetadata() != null ? getMetadata().getName() : "unknown";
    String version = getMetadata() != null ? getMetadata().getResourceVersion() : "unknown";
    return "name=" + name + " version=" + version + " value=" + spec;
  }
}

Class ExampleResourceList:

package com.instana.operator.example.cr;

import io.fabric8.kubernetes.client.CustomResourceList;

public class ExampleResourceList extends CustomResourceList<ExampleResource> {
  // empty
}

Class ExampleResourceDoneable:

package com.instana.operator.example.cr;

import io.fabric8.kubernetes.api.builder.Function;
import io.fabric8.kubernetes.client.CustomResourceDoneable;

public class ExampleResourceDoneable extends CustomResourceDoneable<ExampleResource> {

  public ExampleResourceDoneable(ExampleResource resource, Function<ExampleResource, ExampleResource> function) {
    super(resource, function);
  }
}

Custom Resource Client

Now that we have a model for our custom resource, we need a client to list and watch our resources. With the Kubernetes client API, we will have different clients for accessing built-in Kubernetes resources and custom resources:

  • The KubernetesClient is used to access built-in resources. We already created a producer method for this client in our ClientProvider in the previous post.
  • For accessing the custom resource, we need to create a custom resource client. In order to do that, the Kubernetes client API requires us to load the Custom Resource Definition (CRD) from the API server, and create the custom resource client from the CRD. The custom resource client has the rather awkward type NonNamespaceOperation with a long list of generics parameters.

We add a producer method for the custom resource client to our existing ClientProvider, so that we can @Inject the custom resource client where needed:

@Produces
@Singleton
NonNamespaceOperation<ExampleResource, ExampleResourceList, ExampleResourceDoneable, Resource<ExampleResource, ExampleResourceDoneable>> makeCustomResourceClient(KubernetesClient defaultClient, @Named("namespace") String namespace) {

  KubernetesDeserializer.registerCustomKind("instana.com/v1alpha1", "Example", ExampleResource.class);

  CustomResourceDefinition crd = defaultClient
      .customResourceDefinitions()
      .list()
      .getItems()
      .stream()
      .filter(d -> "examples.instana.com".equals(d.getMetadata().getName()))
      .findAny()
      .orElseThrow(
          () -> new RuntimeException("Deployment error: Custom resource definition examples.instana.com not found."));

  return defaultClient
      .customResources(crd, ExampleResource.class, ExampleResourceList.class, ExampleResourceDoneable.class)
      .inNamespace(namespace);
}

Resource Cache

At the core of a Kubernetes operator is a resource cache: For all watched resources, we want to keep a local cache with the resource state. This cache should be eventually consistent with the resource state on the API server. We model our cache as a Map<String, ExampleResource>, mapping the resources’ uid to the current resource state from the cluster.

In order to maintain this cache, we need a list-then-watch operation: When starting up, we initialize the cache with the current state of the resources, then we watch for changes and update the cache accordingly.

Kubernetes provides a resource version for each resource, which is an integer increasing with each change. If we receive an event, we can use the resource version to learn if this is a new event, or if it’s outdated. Unfortunately the resource version is modeled as a String in the Kubernetes client, so we need to convert this to an integer.

As a hook for our business logic, the listThenWatch() method takes a callback as an argument. This callback is called with a pair of (Action, UID), where the action is an enum with values ADDED, MODIFIED, or DELETED.

package com.instana.operator.example;

import com.instana.operator.example.cr.ExampleResource;
import com.instana.operator.example.cr.ExampleResourceDoneable;
import com.instana.operator.example.cr.ExampleResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

@ApplicationScoped
public class ExampleResourceCache {

  private final Map<String, ExampleResource> cache = new ConcurrentHashMap<>();

  @Inject
  private NonNamespaceOperation<ExampleResource, ExampleResourceList, ExampleResourceDoneable, Resource<ExampleResource, ExampleResourceDoneable>> crClient;

  private Executor executor = Executors.newSingleThreadExecutor();

  public ExampleResource get(String uid) {
    return cache.get(uid);
  }

  public void listThenWatch(BiConsumer<Watcher.Action, String> callback) {

    try {

      // list

      crClient
          .list()
          .getItems()
          .forEach(resource -> {
                cache.put(resource.getMetadata().getUid(), resource);
                String uid = resource.getMetadata().getUid();
                executor.execute(() -> callback.accept(Watcher.Action.ADDED, uid));
              }
          );

      // watch

      crClient.watch(new Watcher<ExampleResource>() {
        @Override
        public void eventReceived(Action action, ExampleResource resource) {
          try {
            String uid = resource.getMetadata().getUid();
            if (cache.containsKey(uid)) {
              int knownResourceVersion = Integer.parseInt(cache.get(uid).getMetadata().getResourceVersion());
              int receivedResourceVersion = Integer.parseInt(resource.getMetadata().getResourceVersion());
              if (knownResourceVersion > receivedResourceVersion) {
                return;
              }
            }
            System.out.println("received " + action + " for resource " + resource);
            if (action == Action.ADDED || action == Action.MODIFIED) {
              cache.put(uid, resource);
            } else if (action == Action.DELETED) {
              cache.remove(uid);
            } else {
              System.err.println("Received unexpected " + action + " event for " + resource);
              System.exit(-1);
            }
            executor.execute(() -> callback.accept(action, uid));
          } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
          }
        }

        @Override
        public void onClose(KubernetesClientException cause) {
          cause.printStackTrace();
          System.exit(-1);
        }
      });
    } catch (Exception e) {
      e.printStackTrace();
      System.exit(-1);
    }
  }
}

A few notes on the code above:

  • For each resource that we find in the initial list operation, we generate an artificial ADDED event so that the business logic knows there is a new resource.
  • In the watch part, we check if the received resource version is higher that our last known resource version, and if so we update the cache and generate an event.
  • The eventReceived() callback in the watch part may be executed in a different thread. Therefore, it is important to handle exceptions there, as they will not be propagated to the main thread.
  • We don’t want to block the Kubernetes client’s event handler thread. We create a single thread executor and schedule our callbacks there. As an additional advantage this implementation ensures that the callback is always called in the same thread, so the callback does not need to be thread safe.
  • It is a good idea to handle errors (like networking errors) with System.exit(-1). This makes the operator Pod terminate and Kubernetes will restart it. If the error is permanent, the Pod will enter a crash backoff loop in Kubernetes.

Business Logic: Creating Daemon Sets

Now that we have our resource cache, we hook in our business logic that will create a DaemonSet whenever a new custom resource is added.

package com.instana.operator.example;

import com.instana.operator.example.cr.ExampleResource;
import io.fabric8.kubernetes.api.model.apps.DaemonSet;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.quarkus.runtime.StartupEvent;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.function.Predicate;

@ApplicationScoped
public class DaemonSetInstaller {

  @Inject
  private KubernetesClient client;

  @Inject
  private ExampleResourceCache cache;

  void onStartup(@Observes StartupEvent _ev) {
    new Thread(this::runWatch).start();
  }

  private void runWatch() {
    cache.listThenWatch(this::handleEvent);
  }

  private void handleEvent(Watcher.Action action, String uid) {
    try {
      ExampleResource resource = cache.get(uid);
      if (resource == null) {
        return;
      }

      Predicate<DaemonSet> ownerRefMatches = daemonSet -> daemonSet.getMetadata().getOwnerReferences().stream()
          .anyMatch(ownerReference -> ownerReference.getUid().equals(uid));

      if (client
          .apps()
          .daemonSets()
          .list()
          .getItems()
          .stream()
          .noneMatch(ownerRefMatches)) {

        client
            .apps()
            .daemonSets()
            .create(newDaemonSet(resource));
      }
    } catch (Exception e) {
      e.printStackTrace();
      System.exit(-1);
    }
  }

  private DaemonSet newDaemonSet(ExampleResource resource) {
    DaemonSet daemonSet = client.apps().daemonSets()
        .load(getClass().getResourceAsStream("/daemonset.yaml")).get();
    daemonSet.getMetadata().getOwnerReferences().get(0).setUid(resource.getMetadata().getUid());
    daemonSet.getMetadata().getOwnerReferences().get(0).setName(resource.getMetadata().getName());
    return daemonSet;
  }
}

Some notes on this code:

  • We start watching custom resources on application startup when we receive the CDI StartupEvent, but we immediately fork off a new thread in order not to block the CDI event handler thread.
  • The handleEvent() callback is straightforward: It takes the custom resource as an owner reference, checks if a daemon set with that owner reference already exists, and creates a new one if it doesn’t. Owner references were covered in the first post of this series.

The daemon set is defined in daemonset.yaml which is loaded from src/main/resources/daemonset.yaml:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  generateName: example-
  namespace: default
  ownerReferences:
    - apiVersion: apps/v1
      kind: Example
      name: placeholder
      uid: placeholder
spec:
  selector:
    matchLabels:
      app: kuard
  template:
    metadata:
      labels:
        app: kuard
    spec:
      containers:
        - image: gcr.io/kuar-demo/kuard-amd64:blue
          imagePullPolicy: IfNotPresent
          name: kuard

In order to have Quarkus include the daemonset.yaml file in the native build, we need to add the following parameter to the quarkus-maven-plugin configuration in the native profile in pom.xml:

<additionalBuildArgs>-H:IncludeResources=daemonset.yaml</additionalBuildArgs>

Build and run

Create the Docker image with the commands from the previous blog post:

./mvnw package -Pnative -DskipTests -Dnative-image.docker-build=true
docker build -f src/main/docker/Dockerfile.native -t quarkus-quickstart/getting-started .

Before deploying the operator, we need to create the custom resource definition, because the operator reads the custom resource definition on startup in ClientProvider.makeCustomResourceClient().

Create a file operator-example.crd.yaml with the following content:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: examples.instana.com
spec:
  group: instana.com
  names:
    kind: Example
    listKind: ExampleList
    plural: examples
    singular: example
  scope: Namespaced
  version: v1alpha1

Apply operator-example.crd.yaml:

kubectl apply -f operator-example.crd.yaml

The operator needs more access rights than in the previous blog post, because it lists and watches the custom resource and create the daemon set. Extend the operator-example.clusterrole.yaml file from the previous blog post as follows:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: operator-example
rules:
- apiGroups:
  - apps
  resources:
  - daemonsets
  verbs:
  - list
  - get
  - create
  - update
- apiGroups:
  - extensions
  resources:
  - daemonsets
  verbs:
  - get
- apiGroups:
  - apiextensions.k8s.io
  resources:
  - customresourcedefinitions
  verbs:
  - list
- apiGroups:
  - instana.com
  resources:
  - examples
  verbs:
  - list
  - watch
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - list

Apply operator-example.clusterrole.yaml:

kubectl apply -f operator-example.clusterrole.yaml

Finally, we are ready to deploy our operator. We use the operator-example.deployment.yaml file from the previous blog post:

kubectl apply -f operator-example.deployment.yaml

The operator should start up and wait for custom resources. In order to create a custom resource, copy the example YAML from the top of this blog post into a file called operator-example.cr.yaml and apply it:

kubectl apply -f operator-example.cr.yaml

You should see in the operator logs that the operator observed an ADDED event for the resource, and you should see with kubectl get daemonsets that a daemonset was created.

If you remove the custom resource with kubectl delete -f operator-example.cr.yaml, the daemonset should be removed as well due to the garbage collection based on owner reference, as described in the first blog post.

Summary

In this series, we showed how to implement a Kubernetes operator in Java, using Quarkus and the Kubernetes client extension. We started off with an introduction to Kubernetes operators, moved on to create our first Quarkus project with the Kubernetes client extension and finally created some more realistic operator functionality in this blog post, like watching custom resources.

If you want to carry on, the next steps would be to watch the Pods so that the operator knows when Pods change their status to running, and to assign the special roles to the Pods. Moreover, you might want to watch the daemonsets and re-create it if it accidentally got deleted.

The first alpha version of our Instana Agent Operator is available on GitHub, so if you want to have a look at a more complete example, this might be a good reference.

14 days, no credit card, full version

Free Trial

Sign up for our blog updates!
|
Category: Engineering
Part 2: Getting Started with the Quarkus Kubernetes Client Extension This is the second Blog post in our series on...
|
Category: Engineering
Part 1: What is a Kubernetes Operator? We recently published the first alpha version of our Instana Agent Operator, built...
|
Category: Events, Featured
Instana Showcases APM for DevOps Around the World This week, Instana is actively participating at 3 great industry events. Personally,...

Start your FREE TRIAL today!

Free Trial

About Instana

As the leading provider of Automatic Application Performance Monitoring (APM) solutions for microservices, Instana has developed the automatic monitoring and AI-based analysis DevOps needs to manage the performance of modern applications. Instana is the only APM solution that automatically discovers, maps and visualizes microservice applications without continuous additional engineering. Customers using Instana achieve operational excellence and deliver better software faster. Visit https://instana.com to learn more.