-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_io_intelligence.py
More file actions
389 lines (338 loc) · 17.2 KB
/
llm_io_intelligence.py
File metadata and controls
389 lines (338 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
import os
import json
import logging
import base64
from typing import Optional, List, Dict, Any, Union, Iterator
import asyncio
import aiohttp
import llm
from pathlib import Path
from datetime import datetime, timedelta
import hashlib
import mimetypes
import queue
import threading
# Configure logging to be less verbose - only warnings and errors
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
async def fetch_available_models(api_key: str) -> List[tuple]:
"""Fetch available models from IO Intelligence API"""
api_base = "https://api.intelligence.io.solutions/api/v1"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
try:
async with session.get(f"{api_base}/models", headers=headers) as response:
if response.status == 200:
data = await response.json()
models = []
# Parse the response to extract model information
# Assuming the API returns a list of models with id, name, and context_length
model_list = data.get("data", [])
for model_data in model_list:
model_id = model_data.get("id")
full_name = model_data.get("id") # Use the ID as the full name since name/full_name fields don't exist
context_length = 32000 # Default to 32K since context_length field doesn't exist
if model_id and full_name:
# Prepend "ionet/" to the model ID
model_id = f"ionet/{model_id}"
models.append((model_id, full_name, context_length))
return models
else:
logger.warning(f"Failed to fetch models: {response.status}")
return []
except Exception as e:
logger.warning(f"Error fetching models from API: {e}")
return []
@llm.hookimpl
def register_models(register):
logger.debug("Registering io intelligence models")
# Try to get API key from LLM key system
api_key = None
try:
if hasattr(llm, 'get_key'):
api_key = llm.get_key(None, "ionet", None)
# If no API key from LLM key system, try environment variable
if not api_key:
api_key = os.environ.get("IONET")
except Exception as e:
logger.warning(f"Error getting API key: {e}")
# Fall back to environment variable
api_key = os.environ.get("IONET")
# Try to fetch models from API if API key is available
models = []
if api_key:
logger.debug("Attempting to fetch models from API")
try:
# Run the async function in a new event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
models = loop.run_until_complete(fetch_available_models(api_key))
logger.debug(f"Successfully fetched {len(models)} models from API")
except Exception as e:
logger.warning(f"Failed to fetch models from API: {e}")
# If API fetch fails, don't fall back to hardcoded models when API key is present
# This ensures we only use API models when API key is set
models = []
else:
logger.debug("No API key present, will use hardcoded models")
# Fallback to hardcoded models only if no API key is present
if not api_key and not models:
logger.debug("Using hardcoded model list")
models = [
# Current models from API
("ionet/llama-4-maverick-17b", "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", 430000),
("ionet/deepseek-r1-0528", "deepseek-ai/DeepSeek-R1-0528", 128000),
("ionet/intel-qwen3-coder-480b", "Intel/Qwen3-Coder-480B-A35B-Instruct-int4-mixed-ar", 32000),
("ionet/qwen3-235b", "Qwen/Qwen3-235B-A22B-FP8", 32000),
("ionet/llama-3.2-90b-vision", "meta-llama/Llama-3.2-90B-Vision-Instruct", 16000),
("ionet/qwen2.5-vl-32b", "Qwen/Qwen2.5-VL-32B-Instruct", 32000),
("ionet/llama-3.3-70b", "meta-llama/Llama-3.3-70B-Instruct", 128000),
("ionet/devstral-small-2505", "mistralai/Devstral-Small-2505", 32000),
("ionet/magistral-small-2506", "mistralai/Magistral-Small-2506", 32000),
("ionet/mistral-large-2411", "mistralai/Mistral-Large-Instruct-2411", 128000),
("ionet/aya-expanse-32b", "CohereForAI/aya-expanse-32b", 8000),
]
logger.debug(f"Registering {len(models)} models")
for model_id, full_name, context_length in models:
logger.debug(f"Registering model: {model_id} ({full_name})")
model = IOIntelligenceModel(model_id, full_name, context_length)
register(model)
class IOIntelligenceModel(llm.Model):
can_stream = True
supports_tools = True
attachment_types = {"image/jpeg", "image/png", "image/gif", "image/webp"}
def __init__(self, model_id: str, full_model_name: str, context_length: Optional[int] = None):
self.model_id = model_id
self.full_model_name = full_model_name
self.context_length = context_length
self.api_base = "https://api.intelligence.io.solutions/api/v1"
logger.debug(f"Initialized model {model_id} with context length {context_length}")
# Set the model ID for llm framework
self.__dict__["model_id"] = model_id
def __str__(self):
return f"IOIntelligenceModel: {self.model_id}"
def build_messages(self, prompt, conversation) -> List[Dict[str, Any]]:
messages = []
if conversation:
for prev_response in conversation.responses:
messages.extend(prev_response.request.messages)
# Add the current prompt
messages.append({"role": "user", "content": prompt.prompt})
return messages
async def execute_async_with_tools(self, prompt, tools=None, get_env_var=None, stream=False):
"""Execute the model asynchronously with tool support"""
# Try to get API key
api_key = None
if get_env_var:
api_key = get_env_var("ionet")
else:
# Try to get API key from LLM key system
try:
if hasattr(llm, 'get_key'):
api_key = llm.get_key(None, "ionet", None)
# If no API key from LLM key system, try environment variable
if not api_key:
api_key = os.environ.get("IONET")
except Exception as e:
logger.warning(f"Error getting API key: {e}")
# Fall back to environment variable
api_key = os.environ.get("IONET")
if not api_key:
raise ValueError("IONET key is required. Set it with 'llm keys set ionet' or IONET environment variable.")
# Build messages
messages = self.build_messages(prompt, prompt.conversation if hasattr(prompt, 'conversation') else None)
# Prepare the request payload
payload = {
"model": self.full_model_name,
"messages": messages,
"stream": stream,
"tools": tools if tools else [],
}
# Add attachments if present
attachments = getattr(prompt, 'attachments', [])
if attachments:
payload["attachments"] = self._process_attachments(attachments)
logger.debug(f"Sending request to {self.api_base}/chat/completions with model {self.full_model_name}")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"{self.api_base}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"API request failed with status {response.status}: {error_text}")
raise Exception(f"API request failed: {response.status} - {error_text}")
if stream:
# Handle streaming response - parse SSE format
buffer = ""
async for chunk in response.content:
if chunk:
buffer += chunk.decode('utf-8')
# Process complete lines
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
# Skip empty lines and non-data lines
if not line or not line.startswith('data:'):
continue
# Extract the data part
data_str = line[5:].strip() # Remove 'data:' prefix
# Check for end of stream
if data_str == '[DONE]':
break
try:
# Parse the JSON data
data = json.loads(data_str)
# Extract content from choices
if 'choices' in data and len(data['choices']) > 0:
choice = data['choices'][0]
if 'delta' in choice and 'content' in choice['delta']:
content = choice['delta']['content']
if content:
yield content
except json.JSONDecodeError:
# Skip invalid JSON
continue
else:
# Handle non-streaming response
result = await response.json()
logger.debug(f"Received response: {result}")
# Extract the content and tool calls
choice = result["choices"][0]
message = choice["message"]
# Handle tool calls if present
if "tool_calls" in message and message["tool_calls"]:
return_message = {
"content": message.get("content", ""),
"tool_calls": message["tool_calls"]
}
else:
return_message = {
"content": message.get("content", ""),
"tool_calls": []
}
# Yield the result for non-streaming mode
yield return_message
except aiohttp.ClientError as e:
logger.error(f"Network error during API request: {e}")
raise Exception(f"Network error: {e}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON response: {e}")
raise Exception(f"Invalid JSON response: {e}")
except KeyError as e:
logger.error(f"Missing expected key in response: {e}")
raise Exception(f"Invalid response format: missing key {e}")
def _process_attachments(self, attachments) -> List[Dict[str, str]]:
"""Process attachments and convert them to base64 encoded strings"""
processed_attachments = []
for attachment in attachments:
# Determine MIME type
mime_type = attachment.mime_type or mimetypes.guess_type(attachment.path)[0] or "application/octet-stream"
# Read and encode the file
try:
with open(attachment.path, "rb") as f:
encoded_content = base64.b64encode(f.read()).decode('utf-8')
processed_attachments.append({
"type": "base64",
"media_type": mime_type,
"data": encoded_content
})
except Exception as e:
logger.warning(f"Failed to process attachment {attachment.path}: {e}")
return processed_attachments
def execute(self, prompt, stream: bool, response, conversation=None):
"""Synchronous wrapper for async execution"""
# Store the prompt JSON for debugging
messages = self.build_messages(prompt, conversation)
response._prompt_json = {"messages": messages}
# Run the async method in a new event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if stream:
# Handle streaming - yield chunks in real-time
def sync_stream():
# Create queues to handle async-to-sync streaming
chunk_queue = queue.Queue()
exception_queue = queue.Queue()
done_queue = queue.Queue()
# Producer function that runs in a separate thread
def producer():
async def async_producer():
try:
async for chunk in self.execute_async_with_tools(prompt, stream=True):
chunk_queue.put(chunk)
except Exception as e:
exception_queue.put(e)
finally:
done_queue.put(True)
# Run the async producer
loop.run_until_complete(async_producer())
# Start the producer in a separate thread
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# Consumer - yield chunks as they arrive
while True:
try:
# Check for exceptions first
try:
exc = exception_queue.get_nowait()
raise exc
except queue.Empty:
pass
# Try to get a chunk with timeout
try:
chunk = chunk_queue.get(timeout=0.1)
yield chunk
except queue.Empty:
# Check if we're done
try:
done_queue.get_nowait()
break
except queue.Empty:
# Still waiting, continue
continue
except Exception as e:
# Make sure to join the thread before re-raising
producer_thread.join()
raise e
# Wait for the producer thread to finish
producer_thread.join()
return sync_stream()
else:
# Handle non-streaming
result_generator = self.execute_async_with_tools(prompt, stream=False)
# Get the first (and only) item from the generator
result = loop.run_until_complete(result_generator.__anext__())
content = result["content"]
# Handle tool calls if present
if result["tool_calls"]:
for tool_call in result["tool_calls"]:
response.add_tool_call(
llm.ToolCall(
name=tool_call["function"]["name"],
arguments=tool_call["function"]["arguments"],
)
)
# Return the content as an iterator
return iter([content])
async def execute_async(self, prompt, get_env_var=None):
"""Async execution without tools for compatibility"""
result_generator = self.execute_async_with_tools(prompt, get_env_var=get_env_var, stream=False)
# Get the first (and only) item from the generator
result = await result_generator.__anext__()
return result["content"]