1
1
import asyncio
2
2
import json
3
+ import logging
3
4
import os
4
5
import sys
5
6
import tempfile
12
13
13
14
from cycode .cli .cli_types import McpTransportOption , ScanTypeOption
14
15
from cycode .cli .utils .sentry import add_breadcrumb
16
+ from cycode .logger import LoggersManager , get_logger
15
17
16
18
try :
17
19
from mcp .server .fastmcp import FastMCP
22
24
) from None
23
25
24
26
25
- from cycode .logger import get_logger
26
-
27
27
_logger = get_logger ('Cycode MCP' )
28
28
29
29
_DEFAULT_RUN_COMMAND_TIMEOUT = 5 * 60
30
30
31
31
_FILES_TOOL_FIELD = Field (description = 'Files to scan, mapping file paths to their content' )
32
32
33
33
34
+ def _is_debug_mode () -> bool :
35
+ return LoggersManager .global_logging_level == logging .DEBUG
36
+
37
+
38
+ def _gen_random_id () -> str :
39
+ return uuid .uuid4 ().hex
40
+
41
+
34
42
def _get_current_executable () -> str :
35
43
"""Get the current executable path for spawning subprocess."""
36
44
if getattr (sys , 'frozen' , False ): # pyinstaller bundle
@@ -49,7 +57,8 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI
49
57
Returns:
50
58
Dictionary containing the parsed JSON result or error information
51
59
"""
52
- cmd_args = [_get_current_executable (), '-o' , 'json' , * list (args )]
60
+ verbose = ['-v' ] if _is_debug_mode () else []
61
+ cmd_args = [_get_current_executable (), * verbose , '-o' , 'json' , * list (args )]
53
62
_logger .debug ('Running Cycode CLI command: %s' , ' ' .join (cmd_args ))
54
63
55
64
try :
@@ -61,6 +70,9 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI
61
70
stdout_str = stdout .decode ('UTF-8' , errors = 'replace' ) if stdout else ''
62
71
stderr_str = stderr .decode ('UTF-8' , errors = 'replace' ) if stderr else ''
63
72
73
+ if _is_debug_mode (): # redirect debug output
74
+ sys .stderr .write (stderr_str )
75
+
64
76
if not stdout_str :
65
77
return {'error' : 'No output from command' , 'stderr' : stderr_str , 'returncode' : process .returncode }
66
78
@@ -87,7 +99,7 @@ def _create_temp_files(files_content: dict[str, str]) -> list[str]:
87
99
_logger .debug ('Creating temporary files in directory: %s' , temp_dir )
88
100
89
101
for file_path , content in files_content .items ():
90
- safe_filename = f'{ uuid . uuid4 (). hex } _{ Path (file_path ).name } '
102
+ safe_filename = f'{ _gen_random_id () } _{ Path (file_path ).name } '
91
103
temp_file_path = os .path .join (temp_dir , safe_filename )
92
104
93
105
os .makedirs (os .path .dirname (temp_file_path ), exist_ok = True )
@@ -125,15 +137,39 @@ def _cleanup_temp_files(temp_files: list[str]) -> None:
125
137
126
138
async def _run_cycode_scan (scan_type : ScanTypeOption , temp_files : list [str ]) -> dict [str , Any ]:
127
139
"""Run cycode scan command and return the result."""
128
- args = ['scan' , '-t' , str (scan_type ), 'path' , * temp_files ]
129
- return await _run_cycode_command (* args )
140
+ return await _run_cycode_command (* ['scan' , '-t' , str (scan_type ), 'path' , * temp_files ])
130
141
131
142
132
143
async def _run_cycode_status () -> dict [str , Any ]:
133
144
"""Run cycode status command and return the result."""
134
145
return await _run_cycode_command ('status' )
135
146
136
147
148
+ async def _cycode_scan_tool (scan_type : ScanTypeOption , files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
149
+ _tool_call_id = _gen_random_id ()
150
+ _logger .info ('Scan tool called, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
151
+
152
+ if not files :
153
+ _logger .error ('No files provided for scan' )
154
+ return json .dumps ({'error' : 'No files provided' })
155
+
156
+ temp_files = _create_temp_files (files )
157
+
158
+ try :
159
+ _logger .info (
160
+ 'Running Cycode scan, %s' ,
161
+ {'scan_type' : scan_type , 'files_count' : len (temp_files ), 'call_id' : _tool_call_id },
162
+ )
163
+ result = await _run_cycode_scan (scan_type , temp_files )
164
+ _logger .info ('Scan completed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
165
+ return json .dumps (result , indent = 2 )
166
+ except Exception as e :
167
+ _logger .error ('Scan failed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id , 'error' : str (e )})
168
+ return json .dumps ({'error' : f'Scan failed: { e !s} ' }, indent = 2 )
169
+ finally :
170
+ _cleanup_temp_files (temp_files )
171
+
172
+
137
173
async def cycode_secret_scan (files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
138
174
"""Scan files for hardcoded secrets.
139
175
@@ -148,18 +184,7 @@ async def cycode_secret_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
148
184
Returns:
149
185
JSON string containing scan results and any secrets found
150
186
"""
151
- _logger .info ('Secret scan tool called' )
152
-
153
- if not files :
154
- return json .dumps ({'error' : 'No files provided' })
155
-
156
- temp_files = _create_temp_files (files )
157
-
158
- try :
159
- result = await _run_cycode_scan (ScanTypeOption .SECRET , temp_files )
160
- return json .dumps (result , indent = 2 )
161
- finally :
162
- _cleanup_temp_files (temp_files )
187
+ return await _cycode_scan_tool (ScanTypeOption .SECRET , files )
163
188
164
189
165
190
async def cycode_sca_scan (files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
@@ -178,18 +203,7 @@ async def cycode_sca_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
178
203
Returns:
179
204
JSON string containing scan results, vulnerabilities, and license issues found
180
205
"""
181
- _logger .info ('SCA scan tool called' )
182
-
183
- if not files :
184
- return json .dumps ({'error' : 'No files provided' })
185
-
186
- temp_files = _create_temp_files (files )
187
-
188
- try :
189
- result = await _run_cycode_scan (ScanTypeOption .SCA , temp_files )
190
- return json .dumps (result , indent = 2 )
191
- finally :
192
- _cleanup_temp_files (temp_files )
206
+ return await _cycode_scan_tool (ScanTypeOption .SCA , files )
193
207
194
208
195
209
async def cycode_iac_scan (files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
@@ -208,18 +222,7 @@ async def cycode_iac_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
208
222
Returns:
209
223
JSON string containing scan results and any misconfigurations found
210
224
"""
211
- _logger .info ('IaC scan tool called' )
212
-
213
- if not files :
214
- return json .dumps ({'error' : 'No files provided' })
215
-
216
- temp_files = _create_temp_files (files )
217
-
218
- try :
219
- result = await _run_cycode_scan (ScanTypeOption .IAC , temp_files )
220
- return json .dumps (result , indent = 2 )
221
- finally :
222
- _cleanup_temp_files (temp_files )
225
+ return await _cycode_scan_tool (ScanTypeOption .IAC , files )
223
226
224
227
225
228
async def cycode_sast_scan (files : dict [str , str ] = _FILES_TOOL_FIELD ) -> str :
@@ -238,18 +241,7 @@ async def cycode_sast_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
238
241
Returns:
239
242
JSON string containing scan results and any security flaws found
240
243
"""
241
- _logger .info ('SAST scan tool called' )
242
-
243
- if not files :
244
- return json .dumps ({'error' : 'No files provided' })
245
-
246
- temp_files = _create_temp_files (files )
247
-
248
- try :
249
- result = await _run_cycode_scan (ScanTypeOption .SAST , temp_files )
250
- return json .dumps (result , indent = 2 )
251
- finally :
252
- _cleanup_temp_files (temp_files )
244
+ return await _cycode_scan_tool (ScanTypeOption .SAST , files )
253
245
254
246
255
247
async def cycode_status () -> str :
@@ -265,13 +257,21 @@ async def cycode_status() -> str:
265
257
Returns:
266
258
JSON string containing CLI status, version, and configuration details
267
259
"""
260
+ _tool_call_id = _gen_random_id ()
268
261
_logger .info ('Status tool called' )
269
262
270
- result = await _run_cycode_status ()
271
- return json .dumps (result , indent = 2 )
263
+ try :
264
+ _logger .info ('Running Cycode status check, %s' , {'call_id' : _tool_call_id })
265
+ result = await _run_cycode_status ()
266
+ _logger .info ('Status check completed, %s' , {'call_id' : _tool_call_id })
267
+
268
+ return json .dumps (result , indent = 2 )
269
+ except Exception as e :
270
+ _logger .error ('Status check failed, %s' , {'call_id' : _tool_call_id , 'error' : str (e )})
271
+ return json .dumps ({'error' : f'Status check failed: { e !s} ' }, indent = 2 )
272
272
273
273
274
- def _create_mcp_server (host : str = '127.0.0.1' , port : int = 8000 ) -> FastMCP :
274
+ def _create_mcp_server (host : str , port : int ) -> FastMCP :
275
275
"""Create and configure the MCP server."""
276
276
tools = [
277
277
Tool .from_function (cycode_status ),
@@ -281,7 +281,14 @@ def _create_mcp_server(host: str = '127.0.0.1', port: int = 8000) -> FastMCP:
281
281
Tool .from_function (cycode_sast_scan ),
282
282
]
283
283
_logger .info ('Creating MCP server with tools: %s' , [tool .name for tool in tools ])
284
- return FastMCP ('cycode' , tools = tools , host = host , port = port )
284
+ return FastMCP (
285
+ 'cycode' ,
286
+ tools = tools ,
287
+ host = host ,
288
+ port = port ,
289
+ debug = _is_debug_mode (),
290
+ log_level = 'DEBUG' if _is_debug_mode () else 'INFO' ,
291
+ )
285
292
286
293
287
294
def _run_mcp_server (transport : McpTransportOption , host : str , port : int ) -> None :
0 commit comments