Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block implementation #46

Open
wants to merge 1 commit into
base: version-1.0.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions superpilot/app/client_lib/logging.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging


def get_client_logger():
def get_client_logger(logger_name: str = "application_logger"):
# Configure logging before we do anything else.
# Application logs need a place to live.
client_logger = logging.getLogger("superpilot_client_application")
client_logger = logging.getLogger(logger_name)
client_logger.setLevel(logging.DEBUG)

formatter = logging.Formatter(
Expand Down
3 changes: 3 additions & 0 deletions superpilot/core/block/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@



156 changes: 156 additions & 0 deletions superpilot/core/block/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import abc
import logging
from pathlib import Path
import inflection
from superpilot.core.plugin.base import PluginLocation, PluginStorageFormat
from superpilot.core.resource.model_providers import (
ModelProviderName,
OpenAIModelName,
)
from typing import ClassVar, List
from superpilot.core.configuration.schema import (
SystemConfiguration,
UserConfigurable
)
from pydantic import Field
from pprint import pformat
from typing import Dict, Union, Any
import json, enum


class BlockType(str, enum.Enum):
LLM = "llm"


class BlockConfiguration(SystemConfiguration):
"""Struct for model configuration."""
location: PluginLocation
packages_required: List[str] = Field(default_factory=list)
block_type: str = UserConfigurable()
metadata: Dict = UserConfigurable()
seq_order: int = UserConfigurable()
input_schema: Dict = UserConfigurable()
output_schema: Dict = UserConfigurable()
body: str = UserConfigurable()
id: int = UserConfigurable()

@classmethod
def factory(
cls,
block_data: Dict[str, Any],
) -> "BlockConfiguration":
return BlockConfiguration(
id=block_data['id'],
location=block_data['location'],
block_type=block_data['block_type'],
metadata=block_data['metadata'],
seq_order=block_data['seq_order'],
input_schema=block_data['input_schema'],
output_schema=block_data['output_schema'],
body=block_data['body']
)


class Block(abc.ABC):
"""A class representing an pilot block."""

default_configuration: ClassVar[BlockConfiguration]

_summary: str = None

@classmethod
def name(cls) -> str:
"""The name of the block."""
return inflection.underscore(cls.__name__)

@classmethod
@abc.abstractmethod
def description(cls) -> str:
"""A detailed description of what the block does."""
return ""

@classmethod
@abc.abstractmethod
def arguments(cls) -> dict:
"""A dict of arguments in standard json schema format."""
...

@property
@abc.abstractmethod
def config(self) -> BlockConfiguration:
"""A dict of config in standard json schema format."""
...

@classmethod
def required_arguments(cls) -> List[str]:
"""A list of required arguments."""
return []

@property
def summary(self) -> str:
"""A summary of the block result."""
return self._summary

@abc.abstractmethod
async def __call__(self, *args, **kwargs):
...

def __str__(self) -> str:
return pformat(self.dump)

@staticmethod
def _parse_response(response_content: dict) -> dict:
return {"content": response_content["content"]}

def dump(self) -> dict:
return {
"name": self.name(),
# "description": self.description(),
"parameters": {
"type": "object",
# "properties": self.config(),
},
}

@classmethod
def create_block(
cls,
block_type: type, # Assuming you pass the Class itself
logger: logging.Logger,
configuration: BlockConfiguration,
) -> "Block":
# Instantiate and return Block
return block_type(logger=logger, configuration=configuration)


class BlockRegistry(abc.ABC):
@abc.abstractmethod
def register_block(
self, block_name: str, block_configuration: BlockConfiguration
) -> None:
...

@abc.abstractmethod
def list_blocks(self) -> List[str]:
...

@abc.abstractmethod
def blocks(self) -> List[Block]:
...

@abc.abstractmethod
def dump_blocks(self) -> List[dict]:
...

@abc.abstractmethod
def get_block(self, block_name: str) -> Block:
...

@abc.abstractmethod
async def perform(self, block_name: str, block_args: dict, **kwargs):
...


class BlockException(Exception):
"""Base exception for the block subsystem."""
pass
3 changes: 3 additions & 0 deletions superpilot/core/block/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@



31 changes: 31 additions & 0 deletions superpilot/core/block/execution/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import abc
import logging
from pathlib import Path
import inflection


class BaseExecutor(abc.ABC):
@abc.abstractmethod
def __init__(self, *args, **kwargs):
...

@abc.abstractmethod
async def execute(self, *args, **kwargs):
...

# @abc.abstractmethod
# async def observe(self, *args, **kwargs):
# ...

@abc.abstractmethod
def __repr__(self):
...

@classmethod
def name(cls) -> str:
"""The name of the ability."""
return inflection.underscore(cls.__name__)

@abc.abstractmethod
def dump(self):
...
127 changes: 127 additions & 0 deletions superpilot/core/block/execution/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import asyncio
import logging
import platform
import time
from typing import List, Dict, Any
import distro
import inflection
import json

from superpilot.core.block.base import BlockRegistry, Block
from superpilot.core.block.execution.base import BaseExecutor
from superpilot.core.pilot.settings import ExecutionNature


class SimpleExecutor(BaseExecutor):

default_configuration = {}

def __init__(
self,
block_registry: BlockRegistry,
logger: logging.Logger = logging.getLogger(__name__),
) -> None:
self._logger = logger
self._block_registry = block_registry
self._execution_nature = ExecutionNature.SEQUENTIAL
self._execution_state: Dict[str, Any] = {}

# async def interaction_handler(self):
async def execute(self, **kwargs):
"""Execute the task."""
self._logger.debug(f"Executing task:")
return await self.exec_blocks(**kwargs)

async def exec_blocks(self, **kwargs) -> None:
self._execution_state = {"global": kwargs}
# Execute for Sequential nature
for block in self._block_registry.blocks():
try:
if block.config.block_type == 'form':
block_input = kwargs
else:
block_input = self._prepare_block_input(block, block.config.input_schema)
response = await self._block_registry.perform(block.name(), **block_input)
self._update_execution_state(block, response)
self._logger.debug(f"Response from block {block.name()}: {response}")
self._logger.debug(f"Execution State {self._execution_state}")
except Exception as e:
self._logger.error(f"Error executing block {block.name()}: {e}")
raise e
return self._execution_state

def get_nested_val(self, object, nesting):
for key in nesting.split('.'):
if key in object:
object = object[key]
else:
return None
return object

def _prepare_block_input(self, block: Block, properties) -> Dict[str, Any]:
block_input = {}
print("Block", block.name())
print("Schema", properties)
for input_key, input_schema in properties.items():
# if 'reference' in value:
# reference_path = value['reference'].split('.')
# value_from_previous = self.get_value_from_nested_dict(previous_input_schema, reference_path[1:])
print("Input key", input_key, input_schema)
if input_schema.get('properties'):
block_input[input_key] = self._prepare_block_input(block, input_schema.get('properties'))
elif input_schema.get('reference'):
block_id, nesting = input_schema['reference'].split('.', 1)
block_input[input_key] = self.get_nested_val(self._execution_state[block_id], nesting)
elif input_schema.get('value'):
block_input[input_key] = input_schema['value']
else:
block_input[input_key] = None
print("Input Value", block_input[input_key])
print("Block Input", block_input)
return block_input

def _update_execution_state(self, block: Block, response: Dict[str, Any]) -> None:
self._execution_state[f"block_{block.config.id}"] = response
# for output_key in block.config.output_schema.keys():
# if output_key in response:
# self._execution_state[f"block_{block.config.id}"][output_key] = response[output_key]
# else:
# self._logger.warning(f"Expected output '{output_key}' not found in response from block {block.name()}")

def get_value_from_nested_dict(self, data, keys):
for key in keys:
data = data[key]
return data

def set_value_in_nested_dict(self, data, keys, value):
for key in keys[:-1]:
data = data.setdefault(key, {})
data[keys[-1]] = value

def load_values_from_previous_block(self, previous_block, next_block):

previous_input_schema = json.loads(previous_block['input_schema'])
next_input_schema = json.loads(next_block['input_schema'])

for key, value in next_input_schema['query_params'].items():
if 'reference' in value:
reference_path = value['reference'].split('.')
value_from_previous = self.get_value_from_nested_dict(previous_input_schema, reference_path[1:])
self.set_value_in_nested_dict(next_input_schema, ['query_params', key, 'value'], value_from_previous)

next_block['input_schema'] = json.dumps(next_input_schema)
def __repr__(self):
pass

def dump(self):
pass


def get_os_info() -> str:
os_name = platform.system()
os_info = (
platform.platform(terse=True)
if os_name != "Linux"
else distro.name(pretty=True)
)
return os_info
Loading