File size: 5,265 Bytes
bc69905
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import requests
import zipfile 
import tempfile

def group_tests_by_duration(file_path: str) -> dict:
    # Define the buckets and their labels
    buckets = [(0, 5), (5, 10), (10, 15), (15, 20), (20, float('inf'))]
    bucket_names = ["0-5s", "5-10s", "10-15s", "15-20s", ">20s"]
    test_groups = {name: [] for name in bucket_names}
    
    # Process the file with error handling
    with open(file_path, 'r') as file:
        for line in file:
            try:
                parts = line.split()
                # Extracting duration and test name, ignoring lines that don't match expected format
                if len(parts) >= 3 and 's' in parts[0]:
                    duration = float(parts[0].rstrip('s'))  # Remove 's' and convert to float
                    test_name = ' '.join(parts[2:])  # Join back the test name parts
                    
                    # Assign test to the correct bucket based on duration
                    for (start, end), bucket_name in zip(buckets, bucket_names):
                        if start <= duration < end:
                            test_groups[bucket_name].append((duration, test_name))
                            break
            except ValueError:
                # Skip lines that cannot be parsed properly
                continue
    
    return test_groups


def extract_top_n_tests(file_path, n=10):
    test_durations = []

    # Reading and processing the file
    with open(file_path, 'r') as file:
        for line in file:
            parts = line.split()
            if len(parts) >= 3 and parts[1] == 'call':
                duration_s = parts[0].rstrip('s')  # Remove the 's' from the duration
                try:
                    duration = float(duration_s)
                    test_name = ' '.join(parts[2:])
                    test_durations.append((duration, test_name))
                except ValueError:
                    # Skip lines that cannot be converted to float
                    continue

    # Sort the list in descending order of duration
    test_durations.sort(reverse=True, key=lambda x: x[0])

    # Extract the top N tests
    top_n_tests = {test[1]: f"{test[0]}s"
                   for i, test in enumerate(test_durations[:n])}

    return top_n_tests


def fetch_test_duration_artifact(repo_id, token, run_id, artifact_name):
    # Construct the API URL
    owner_repo = repo_id.split("/")
    artifacts_url = f'https://api.github.com/repos/{owner_repo[0]}/{owner_repo[1]}/actions/runs/{run_id}/artifacts'

    # Set up the headers with your authentication token
    headers = {'Authorization': f'token {token}'}

    # Send the request to get a list of artifacts from the specified run
    response = requests.get(artifacts_url, headers=headers)
    response.raise_for_status()  # Raise an exception for HTTP error responses

    # Search for the artifact with the specified name
    download_url = None
    for artifact in response.json().get('artifacts', []):
        if artifact['name'] == artifact_name:
            download_url = artifact['archive_download_url']
            break

    if download_url:
        # Download the artifact
        download_response = requests.get(download_url, headers=headers, stream=True)
        download_response.raise_for_status()

        # Save the downloaded artifact to a file
        zip_file_path = f'{artifact_name}.zip'
        with open(zip_file_path, 'wb') as file:
            for chunk in download_response.iter_content(chunk_size=128):
                file.write(chunk)

        # Extract the duration text file
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            # Check if the specified file exists in the zip
            zip_files = zip_ref.namelist()
            for file in zip_files:
                if "duration" in file:
                    zip_ref.extract(file, ".")
                    break
        return file
    
    else:
        raise ValueError("Error 🥲")
    
def format_to_markdown_str(test_bucket_map, top_n_slow_tests, repo_id, run_id, artifact_name):
    run_url = f"https://github.com/{repo_id}/actions/runs/{run_id}/"
    markdown_str = f"""
## Top {len(top_n_slow_tests)} slow test for {artifact_name}\n
"""
    for test, duration in top_n_slow_tests.items():
        markdown_str += f"* {test.split('/')[-1]}: {duration}\n"
    
    markdown_str += """
## Bucketed durations of the tests\n
"""
    for bucket, num_tests in test_bucket_map.items():
        if ">" in bucket:
            bucket = f"\{bucket}"
        markdown_str += f"* {bucket}: {num_tests} tests\n"
    
    markdown_str += f"\nRun URL: [{run_url}]({run_url})."
    
    return markdown_str
    

def analyze_tests(repo_id, token, run_id, artifact_name, top_n):
    test_duration_file = fetch_test_duration_artifact(repo_id=repo_id, token=token, run_id=run_id, artifact_name=artifact_name)
    
    grouped_tests_map = group_tests_by_duration(test_duration_file)
    test_bucket_map = {bucket: len(tests) for bucket, tests in grouped_tests_map.items()}
    print(test_bucket_map)
    top_n_slow_tests = extract_top_n_tests(test_duration_file, n=top_n)
    print(top_n_slow_tests)

    return format_to_markdown_str(test_bucket_map, top_n_slow_tests, repo_id, run_id, artifact_name)