File size: 2,556 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
import subprocess
import pytest

from ding.utils import K8sLauncher, OrchestratorLauncher

try:
    from kubernetes import config, client, watch
except ImportError:
    _test_mark = pytest.mark.ignore
else:
    _test_mark = pytest.mark.envtest


@_test_mark
def test_operate_k8s_cluster():
    cluster_name = 'test-k8s-launcher'
    config_path = os.path.join(os.path.dirname(__file__), 'config', 'k8s-config.yaml')
    launcher = K8sLauncher(config_path)
    launcher.name = cluster_name

    # create cluster
    launcher.create_cluster()

    # check that cluster is successfully created
    config.load_kube_config()
    current_context = config.list_kube_config_contexts()[1]
    assert current_context['context']['cluster'].startswith(f"k3d-{cluster_name}")
    subprocess.run('kubectl create ns di-system', shell=True)

    # create orchestrator
    olauncher = OrchestratorLauncher('v1.1.3', cluster=launcher)
    olauncher.create_orchestrator()

    # check orchestrator is successfully created
    expected_deployments, expected_crds = 2, 1
    appv1 = client.AppsV1Api()
    ret = appv1.list_namespaced_deployment("di-system")
    assert len(ret.items) == expected_deployments

    # check crds are installed
    extensionv1 = client.ApiextensionsV1Api()
    ret = extensionv1.list_custom_resource_definition()
    found = 0
    for crd in ret.items:
        found = found + 1 if crd.metadata.name == 'aggregatorconfigs.diengine.opendilab.org' else found
        found = found + 1 if crd.metadata.name == 'dijobs.diengine.opendilab.org' else found
    assert found == expected_crds

    # delete orchestrator
    olauncher.delete_orchestrator()

    # sleep for a few seconds and check crds are deleted
    timeout = 10
    deleted_crds = 0
    w = watch.Watch()
    for event in w.stream(extensionv1.list_custom_resource_definition, timeout_seconds=timeout):
        if event['type'] == "DELETED":
            deleted_crds += 1
        if deleted_crds == expected_crds:
            w.stop()
    ret = extensionv1.list_custom_resource_definition()
    found = 0
    for crd in ret.items:
        found = found + 1 if crd.metadata.name == 'dijobs.diengine.opendilab.org' else found
    assert found == 0

    # delete cluster
    launcher.delete_cluster()
    try:
        config.load_kube_config()
    except Exception:
        print("No k8s cluster found, skipped...")
    else:
        current_context = config.list_kube_config_contexts()[1]
        assert not current_context['context']['cluster'].startswith(f"k3d-{cluster_name}")