CORD-2965 Kubernetes Synchronizer
Change-Id: Ie5c02b9ad1c65af686598bab0b36350ac1caef64
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index bea45d7..8d5d544 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -19,7 +19,8 @@
"""
from synchronizers.new_base.pullstep import PullStep
-from synchronizers.new_base.modelaccessor import KubernetesServiceInstance
+from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
+ TrustDomain, Site, Image
from xosconfig import Config
from multistructlog import create_logger
@@ -32,16 +33,208 @@
"""
KubernetesServiceInstancePullStep
- Pull information from Kubernetes.
+ Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
+ if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
+ as necessary to fill the required dependencies of the KubernetesServiceInstance.
"""
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
kubernetes_config.load_incluster_config()
- self.v1 = kubernetes_client.CoreV1Api()
+ self.v1core = kubernetes_client.CoreV1Api()
+ self.v1apps = kubernetes_client.AppsV1Api()
+
+ def obj_to_handle(self, obj):
+ """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
+ Kubernetes.
+ """
+ return obj.metadata.self_link
+
+ def read_obj_kind(self, kind, name, trust_domain):
+ """ Given an object kind and name, read it from Kubernetes """
+ if kind == "ReplicaSet":
+ resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
+ elif kind == "StatefulSet":
+ resource = self.v1apps.read_namespaced_statefule_set(name, trust_domain.name)
+ elif kind == "DaemonSet":
+ resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
+ elif kind == "Deployment":
+ resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
+ else:
+ resource = None
+ return resource
+
+ def get_controller_from_obj(self, obj, trust_domain, depth=0):
+ """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
+ is marked as a controller, but does not have any owners.
+
+ This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
+ the ReplicaSet and return the Deployment.
+ """
+
+ owner_references = obj.metadata.owner_references
+ if not owner_references:
+ if (depth==0):
+ # If depth is zero, then we're still looking at the object, not a controller.
+ return None
+ return obj
+
+ for owner_reference in owner_references:
+ if not getattr(owner_reference, "controller", False):
+ continue
+ owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
+ controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
+ if controller:
+ return controller
+
+ return None
+
+ def get_slice_from_pod(self, pod, trust_domain, principal):
+ """ Given a pod, determine which XOS Slice goes with it
+ If the Slice doesn't exist, create it.
+ """
+ controller = self.get_controller_from_obj(pod, trust_domain)
+ if not controller:
+ return None
+
+ slice_name = controller.metadata.name
+ if hasattr(controller.metadata, "labels"):
+ if "xos_slice_name" in controller.metadata.labels:
+ # Someone has labeled the controller with an xos slice name. Use it.
+ slice_name = controller.metadata.labels["xos_slice_name"]
+
+ existing_slices = Slice.objects.filter(name = slice_name)
+ if not existing_slices:
+ # TODO(smbaker): atomicity
+ s = Slice(name=slice_name, site = Site.objects.first(),
+ trust_domain=trust_domain,
+ principal=principal,
+ backend_handle=self.obj_to_handle(controller),
+ controller_kind=controller.kind,
+ xos_managed=False)
+ s.save()
+ return s
+ else:
+ return existing_slices[0]
+
+ def get_trustdomain_from_pod(self, pod, owner_service):
+ """ Given a pod, determine which XOS TrustDomain goes with it
+ If the TrustDomain doesn't exist, create it.
+ """
+ existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
+ if not existing_trustdomains:
+ k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
+
+ # TODO(smbaker): atomicity
+ t = TrustDomain(name = pod.metadata.namespace,
+ xos_managed=False,
+ owner=owner_service,
+ backend_handle = self.obj_to_handle(k8s_trust_domain))
+ t.save()
+ return t
+ else:
+ return existing_trustdomains[0]
+
+ def get_principal_from_pod(self, pod, trust_domain):
+ """ Given a pod, determine which XOS Principal goes with it
+ If the Principal doesn't exist, create it.
+ """
+ principal_name = getattr(pod.spec, "service_account", None)
+ if not principal_name:
+ return None
+ existing_principals = Principal.objects.filter(name = principal_name)
+ if not existing_principals:
+ k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
+
+ # TODO(smbaker): atomicity
+ p = Principal(name = principal_name,
+ trust_domain = trust_domain,
+ xos_managed = False,
+ backend_handle = self.obj_to_handle(k8s_service_account))
+ p.save()
+ return p
+ else:
+ return existing_principals[0]
+
+ def get_image_from_pod(self, pod):
+ """ Given a pod, determine which XOS Image goes with it
+ If the Image doesn't exist, create it.
+ """
+ containers = pod.spec.containers
+ if containers:
+ # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
+ container = containers[0]
+ if ":" in container.image:
+ (name, tag) = container.image.split(":")
+ else:
+ # Is assuming a default necessary?
+ name = container.image
+ tag = "master"
+
+ existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
+ if not existing_images:
+ i = Image(name=name, tag=tag, kind="container", xos_managed=False)
+ i.save()
+ return i
+ else:
+ return existing_images[0]
+ else:
+ return None
def pull_records(self):
- # TODO(smbaker): implement pull step here
- pass
+ # Read all pods from Kubernetes, store them in k8s_pods_by_name
+ k8s_pods_by_name = {}
+ ret = self.v1core.list_pod_for_all_namespaces(watch=False)
+ for item in ret.items:
+ k8s_pods_by_name[item.metadata.name] = item
+ # Read all pods from XOS, store them in xos_pods_by_name
+ xos_pods_by_name = {}
+ existing_pods = KubernetesServiceInstance.objects.all()
+ for pod in existing_pods:
+ xos_pods_by_name[pod.name] = pod
+
+ kubernetes_services = KubernetesService.objects.all()
+ if len(kubernetes_services)==0:
+ raise Exception("There are no Kubernetes Services yet")
+ if len(kubernetes_services)>1:
+ # Simplifying assumption -- there is only one Kubernetes Service
+ raise Exception("There are too many Kubernetes Services")
+ kubernetes_service = kubernetes_services[0]
+
+ # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
+ for (k,pod) in k8s_pods_by_name.items():
+ if not k in xos_pods_by_name:
+ trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
+ if not trust_domain:
+ log.warning("Unable to determine trust_domain for %s" % k)
+ continue
+
+ principal = self.get_principal_from_pod(pod, trust_domain)
+ slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
+ image = self.get_image_from_pod(pod)
+
+ if not slice:
+ log.warning("Unable to determine slice for %s" % k)
+ continue
+
+ xos_pod = KubernetesServiceInstance(name=k,
+ pod_ip = pod.status.pod_ip,
+ owner = kubernetes_service,
+ slice = slice,
+ image = image,
+ backend_handle = self.obj_to_handle(pod),
+ xos_managed = False)
+ xos_pod.save()
+ log.info("Created XOS POD %s" % xos_pod.name)
+
+ # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
+ for (k,xos_pod) in xos_pods_by_name.items():
+ if (not k in k8s_pods_by_name):
+ if (xos_pod.xos_managed):
+ # Should we do something so it gets re-created by the syncstep?
+ pass
+ else:
+ xos_pod.delete()
+ log.info("Deleted XOS POD %s" % k)