File size: 4,210 Bytes
6e648e8
 
 
 
 
 
04eb088
 
6e648e8
 
 
 
 
 
 
 
 
c9e8b19
c255c63
6e648e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import uuid
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from jupyter_client import KernelManager
from threading import Lock
import asyncio
from typing import Optional


app = FastAPI()

# A dictionary to store kernel sessions using session_id as the key
kernel_sessions = {}
# Lock for thread-safe access to the kernel_sessions dictionary
sessions_lock = Lock()

class CodeExecutionRequest(BaseModel):
    code: str
    restart: Optional[bool] = False #backward compatiblity 

class CreateSessionResponse(BaseModel):
    session_id: str

@app.post("/create_session", response_model=CreateSessionResponse)
async def create_session():
    """
    Creates a new Jupyter kernel session and returns the session_id.
    """
    session_id = str(uuid.uuid4())  # Generate a unique session ID

    with sessions_lock:
        # Create a new kernel manager and start a kernel
        km = KernelManager()
        km.kernel_name = 'python3'
        km.start_kernel()

        # Create a client for interacting with the kernel
        kc = km.client()
        kc.start_channels()

        # Store the kernel manager and client in the session dictionary
        kernel_sessions[session_id] = {'km': km, 'kc': kc}

    return CreateSessionResponse(session_id=session_id)

@app.post("/execute/{session_id}")
async def execute_code(session_id: str, request: CodeExecutionRequest):
    """
    Executes code in the specified session's Jupyter kernel.
    """
    with sessions_lock:
        session = kernel_sessions.get(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    kc = session['kc']

    # Asynchronous code execution in the kernel
    loop = asyncio.get_running_loop()
    exec_id = uuid.uuid4().hex

    # This function will run in a separate thread to avoid blocking
    def run_code():
        kc.execute(request.code)

        # Collect output messages from the iopub channel
        output = []
        while True:
            try:
                msg = kc.get_iopub_msg(timeout=2)
                msg_type = msg['msg_type']

                # Process different types of iopub messages
                if msg_type == 'stream':
                    output.append(msg['content']['text'])
                elif msg_type == 'error':
                    # Include traceback if there's an error
                    output.extend(msg['content']['traceback'])
                elif msg_type in ['execute_result', 'display_data']:
                    # Capture the output result if it exists
                    output.append(msg['content']['data'].get('text/plain', ''))

                # Exit when execution completes
                if msg_type == 'status' and msg['content']['execution_state'] == 'idle':
                    break

            except Exception as e:
                output.append(f"Error capturing output: {str(e)}")
                break

        return "\n".join(output)

    try:
        # Execute the code and await the result asynchronously
        output = await loop.run_in_executor(None, run_code)
        return {"status": "success", "output": output}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/shutdown/{session_id}")
async def shutdown_session(session_id: str):
    """
    Shuts down the Jupyter kernel associated with the specified session_id.
    """
    with sessions_lock:
        session = kernel_sessions.pop(session_id, None)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Stop the kernel and clean up resources
    try:
        session['kc'].stop_channels()
        session['km'].shutdown_kernel()
        return {"status": "success", "message": "Session terminated"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/list_sessions")
async def list_sessions():
    """
    Lists all active Jupyter kernel sessions.
    """
    with sessions_lock:
        # Prepare a list of session details
        sessions_list = [{"session_id": sid} for sid in kernel_sessions.keys()]

    return {"status": "success", "sessions": sessions_list}