Package imandrax_api
Sub-modules
imandrax_api.api_types_version
imandrax_api.bindings
imandrax_api.lib
imandrax_api.twirp
Classes
class BaseClient
-
Expand source code
class BaseClient: def mk_context(self) -> Context: """Build a request context with the appropriate headers""" return Context(headers={"x-api-version": api_types_version.api_types_version}) def status(self): return self._client.status( ctx=self.mk_context(), request=utils_pb2.Empty(), ) def decompose( self, name: str, assuming: Optional[str] = None, basis: Optional[list[str]] = [], rule_specs: Optional[list[str]] = [], prune: Optional[bool] = True, ctx_simp: Optional[bool] = None, lift_bool: Optional[simple_api_pb2.LiftBool] = None, timeout: Optional[float] = None, str: Optional[bool] = True, ) -> simple_api_pb2.DecomposeRes: timeout = timeout or self._timeout return self._client.decompose( ctx=self.mk_context(), request=simple_api_pb2.DecomposeReq( name=name, assuming=assuming, basis=basis, rule_specs=rule_specs, prune=prune, ctx_simp=ctx_simp, lift_bool=lift_bool, str=str, session=self._sesh, ), timeout=timeout, ) def eval_src( self, src: str, timeout: Optional[float] = None, ) -> simple_api_pb2.EvalRes: timeout = timeout or self._timeout return self._client.eval_src( ctx=self.mk_context(), request=simple_api_pb2.EvalSrcReq(src=src, session=self._sesh), timeout=timeout, ) def verify_src( self, src: str, hints: Optional[str] = None, timeout: Optional[float] = None, ) -> simple_api_pb2.VerifyRes: timeout = timeout or self._timeout return self._client.verify_src( ctx=self.mk_context(), request=simple_api_pb2.VerifySrcReq( src=src, session=self._sesh, hints=hints ), timeout=timeout, ) def instance_src( self, src: str, hints: Optional[str] = None, timeout: Optional[float] = None, ) -> simple_api_pb2.InstanceRes: timeout = timeout or self._timeout return self._client.instance_src( ctx=self.mk_context(), request=simple_api_pb2.InstanceSrcReq( src=src, session=self._sesh, hints=hints ), timeout=timeout, ) def list_artifacts( self, task: task_pb2.Task, timeout: Optional[float] = None ) -> api_pb2.ArtifactListResult: timeout = timeout or self._timeout return self._api_client.list_artifacts( ctx=self.mk_context(), request=api_pb2.ArtifactListQuery(task_id=task.id), timeout=timeout, ) def get_artifact_zip( self, task: task_pb2.Task, kind: str, timeout: Optional[float] = None ) -> api_pb2.ArtifactZip: timeout = timeout or self._timeout return self._api_client.get_artifact_zip( ctx=self.mk_context(), request=api_pb2.ArtifactGetQuery(task_id=task.id, kind=kind), timeout=timeout, ) def typecheck( self, src: str, timeout: Optional[float] = None ) -> simple_api_pb2.TypecheckRes: timeout = timeout or self._timeout return self._client.typecheck( ctx=self.mk_context(), request=simple_api_pb2.TypecheckReq(src=src, session=self._sesh), timeout=timeout, )
Subclasses
Methods
def decompose(self,
name: str,
assuming: str | None = None,
basis: list[str] | None = [],
rule_specs: list[str] | None = [],
prune: bool | None = True,
ctx_simp: bool | None = None,
lift_bool:| None = None,
timeout: float | None = None,
str: bool | None = True) ‑> simple_api_pb2.DecomposeRes-
Expand source code
def decompose( self, name: str, assuming: Optional[str] = None, basis: Optional[list[str]] = [], rule_specs: Optional[list[str]] = [], prune: Optional[bool] = True, ctx_simp: Optional[bool] = None, lift_bool: Optional[simple_api_pb2.LiftBool] = None, timeout: Optional[float] = None, str: Optional[bool] = True, ) -> simple_api_pb2.DecomposeRes: timeout = timeout or self._timeout return self._client.decompose( ctx=self.mk_context(), request=simple_api_pb2.DecomposeReq( name=name, assuming=assuming, basis=basis, rule_specs=rule_specs, prune=prune, ctx_simp=ctx_simp, lift_bool=lift_bool, str=str, session=self._sesh, ), timeout=timeout, )
def eval_src(self, src: str, timeout: float | None = None) ‑> simple_api_pb2.EvalRes
-
Expand source code
def eval_src( self, src: str, timeout: Optional[float] = None, ) -> simple_api_pb2.EvalRes: timeout = timeout or self._timeout return self._client.eval_src( ctx=self.mk_context(), request=simple_api_pb2.EvalSrcReq(src=src, session=self._sesh), timeout=timeout, )
def get_artifact_zip(self, task: task_pb2.Task, kind: str, timeout: float | None = None) ‑> api_pb2.ArtifactZip
-
Expand source code
def get_artifact_zip( self, task: task_pb2.Task, kind: str, timeout: Optional[float] = None ) -> api_pb2.ArtifactZip: timeout = timeout or self._timeout return self._api_client.get_artifact_zip( ctx=self.mk_context(), request=api_pb2.ArtifactGetQuery(task_id=task.id, kind=kind), timeout=timeout, )
def instance_src(self, src: str, hints: str | None = None, timeout: float | None = None) ‑> simple_api_pb2.InstanceRes
-
Expand source code
def instance_src( self, src: str, hints: Optional[str] = None, timeout: Optional[float] = None, ) -> simple_api_pb2.InstanceRes: timeout = timeout or self._timeout return self._client.instance_src( ctx=self.mk_context(), request=simple_api_pb2.InstanceSrcReq( src=src, session=self._sesh, hints=hints ), timeout=timeout, )
def list_artifacts(self, task: task_pb2.Task, timeout: float | None = None) ‑> api_pb2.ArtifactListResult
-
Expand source code
def list_artifacts( self, task: task_pb2.Task, timeout: Optional[float] = None ) -> api_pb2.ArtifactListResult: timeout = timeout or self._timeout return self._api_client.list_artifacts( ctx=self.mk_context(), request=api_pb2.ArtifactListQuery(task_id=task.id), timeout=timeout, )
def mk_context(self) ‑> Context
-
Expand source code
def mk_context(self) -> Context: """Build a request context with the appropriate headers""" return Context(headers={"x-api-version": api_types_version.api_types_version})
Build a request context with the appropriate headers
def status(self)
-
Expand source code
def status(self): return self._client.status( ctx=self.mk_context(), request=utils_pb2.Empty(), )
def typecheck(self, src: str, timeout: float | None = None) ‑> simple_api_pb2.TypecheckRes
-
Expand source code
def typecheck( self, src: str, timeout: Optional[float] = None ) -> simple_api_pb2.TypecheckRes: timeout = timeout or self._timeout return self._client.typecheck( ctx=self.mk_context(), request=simple_api_pb2.TypecheckReq(src=src, session=self._sesh), timeout=timeout, )
def verify_src(self, src: str, hints: str | None = None, timeout: float | None = None) ‑> simple_api_pb2.VerifyRes
-
Expand source code
def verify_src( self, src: str, hints: Optional[str] = None, timeout: Optional[float] = None, ) -> simple_api_pb2.VerifyRes: timeout = timeout or self._timeout return self._client.verify_src( ctx=self.mk_context(), request=simple_api_pb2.VerifySrcReq( src=src, session=self._sesh, hints=hints ), timeout=timeout, )
class Client (url: str = 'https://api.imandra.ai/internal/imandrax/',
server_path_prefix='/api/v1',
auth_token: str | None = None,
timeout: int = 30,
session_id: str | None = None)-
Expand source code
class Client(BaseClient): def __init__( self, url: str = url_prod, server_path_prefix="/api/v1", auth_token: str | None = None, timeout: int = 30, session_id: str | None = None, ) -> None: # use a session to help with cookies. See https://requests.readthedocs.io/en/latest/user/advanced/#session-objects self._session = requests.Session() self._closed = False self._auth_token = auth_token if auth_token: self._session.headers["Authorization"] = f"Bearer {auth_token}" self._url = url self._server_path_prefix = server_path_prefix self._client = simple_api_twirp.SimpleClient( url, timeout=timeout, server_path_prefix=server_path_prefix, session=self._session, ) self._api_client = api_twirp.EvalClient( url, timeout=timeout, server_path_prefix=server_path_prefix, session=self._session, ) self._timeout = timeout if session_id is None: try: self._sesh = self._client.create_session( ctx=self.mk_context(), request=simple_api_pb2.SessionCreateReq( api_version=api_types_version.api_types_version ), timeout=timeout, ) except TwirpServerException as ex: if ex.meta.get("body", {}).get("code") == Errors.InvalidArgument: raise Exception( "API version mismatch. Try upgrading the imandrax-api package." ) from ex else: raise ex else: # TODO: actually re-open session via RPC self._sesh = session_pb2.Session( session_id=session_id, ) def __enter__(self, *_): return self def __exit__(self, *_) -> None: if self._closed: return if not hasattr(self, "_sesh"): return try: self._client.end_session( ctx=self.mk_context(), request=self._sesh, timeout=None ) self._session.close() self._closed = True except TwirpServerException as e: raise Exception("Error while ending session") from e def __del__(self): self.__exit__()
Ancestors
Subclasses
Inherited members