|
import os |
|
import subprocess |
|
import pytest |
|
|
|
from ding.utils import K8sLauncher, OrchestratorLauncher |
|
|
|
try: |
|
from kubernetes import config, client, watch |
|
except ImportError: |
|
_test_mark = pytest.mark.ignore |
|
else: |
|
_test_mark = pytest.mark.envtest |
|
|
|
|
|
@_test_mark |
|
def test_operate_k8s_cluster(): |
|
cluster_name = 'test-k8s-launcher' |
|
config_path = os.path.join(os.path.dirname(__file__), 'config', 'k8s-config.yaml') |
|
launcher = K8sLauncher(config_path) |
|
launcher.name = cluster_name |
|
|
|
|
|
launcher.create_cluster() |
|
|
|
|
|
config.load_kube_config() |
|
current_context = config.list_kube_config_contexts()[1] |
|
assert current_context['context']['cluster'].startswith(f"k3d-{cluster_name}") |
|
subprocess.run('kubectl create ns di-system', shell=True) |
|
|
|
|
|
olauncher = OrchestratorLauncher('v1.1.3', cluster=launcher) |
|
olauncher.create_orchestrator() |
|
|
|
|
|
expected_deployments, expected_crds = 2, 1 |
|
appv1 = client.AppsV1Api() |
|
ret = appv1.list_namespaced_deployment("di-system") |
|
assert len(ret.items) == expected_deployments |
|
|
|
|
|
extensionv1 = client.ApiextensionsV1Api() |
|
ret = extensionv1.list_custom_resource_definition() |
|
found = 0 |
|
for crd in ret.items: |
|
found = found + 1 if crd.metadata.name == 'aggregatorconfigs.diengine.opendilab.org' else found |
|
found = found + 1 if crd.metadata.name == 'dijobs.diengine.opendilab.org' else found |
|
assert found == expected_crds |
|
|
|
|
|
olauncher.delete_orchestrator() |
|
|
|
|
|
timeout = 10 |
|
deleted_crds = 0 |
|
w = watch.Watch() |
|
for event in w.stream(extensionv1.list_custom_resource_definition, timeout_seconds=timeout): |
|
if event['type'] == "DELETED": |
|
deleted_crds += 1 |
|
if deleted_crds == expected_crds: |
|
w.stop() |
|
ret = extensionv1.list_custom_resource_definition() |
|
found = 0 |
|
for crd in ret.items: |
|
found = found + 1 if crd.metadata.name == 'dijobs.diengine.opendilab.org' else found |
|
assert found == 0 |
|
|
|
|
|
launcher.delete_cluster() |
|
try: |
|
config.load_kube_config() |
|
except Exception: |
|
print("No k8s cluster found, skipped...") |
|
else: |
|
current_context = config.list_kube_config_contexts()[1] |
|
assert not current_context['context']['cluster'].startswith(f"k3d-{cluster_name}") |
|
|