Module imandra.cli.ipl_log_analysis
Functions
def check_imandra_ready(port, timeout)
-
Expand source code
def check_imandra_ready(port, timeout): request = urllib.request.Request(f"http://localhost:{port}/status") for x in range(timeout): time.sleep(1) try: response = urllib.request.urlopen(request) if response.read().decode("UTF-8") == "OK": return True except: pass return False
def is_loopback(host)
-
Expand source code
def is_loopback(host): for family in (socket.AF_INET, socket.AF_INET6): try: r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM) except socket.gaierror: return False for family, _, _, _, sockaddr in r: if not ipaddress.ip_address(sockaddr[0]).is_loopback: return False return True
Classes
class IplLogAnalysis (auth,
ipl_file,
traces,
organization,
callback,
interactive,
imandra_host,
imandra_port,
json_out,
decomp_job_id,
runner_image,
sender_comp_id)-
Expand source code
class IplLogAnalysis: def __init__( self, auth, ipl_file, traces, organization, callback, interactive, imandra_host, imandra_port, json_out, decomp_job_id, runner_image, sender_comp_id, ): self._auth = auth self._ipl_file = ipl_file self._traces_dir = Path(traces).absolute() self._organization = organization self._callback = callback self._interactive = interactive == "true" self._imandra_host = imandra_host self._imandra_port = imandra_port self._json_out = Path(json_out).absolute() if json_out is not None else None self._decomp_job_id = decomp_job_id self._runner_image = runner_image self._sender_comp_id = sender_comp_id def login(self): self._auth.login() def _start_imandra(self, timeout): self._imandra_host = "localhost" self._imandra_port = 3000 args = [ "imandra-http-api", "--skip-update", "--dir", self.job_model_path(), ] imandra_proc = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) imandra_pid = imandra_proc.pid def kill_imandra(): os.kill(imandra_pid, signal.SIGTERM) atexit.register(kill_imandra) self._imandra_proc = imandra_proc self._imandra_pid = imandra_pid return check_imandra_ready(self._imandra_port, timeout) def start_imandra(self, timeout): if self._imandra_host is not None or self._imandra_port is not None: self._imandra_host = ( self._imandra_host if self._imandra_host is not None else "localhost" ) self._imandra_port = ( self._imandra_port if self._imandra_port is not None else 3000 ) return True else: return self._start_imandra(timeout) def submit_job(self): job_id = api.ipl.log_analysis_builder( self._auth, self._ipl_file, self._organization, self._callback, self._decomp_job_id, ) self._job_id = job_id return job_id def use_job(self, uuid): self._job_id = uuid def data_dir(self): return os.path.join(self._auth.folder_path, "data") def job_data_dir(self): return os.path.join(self.data_dir(), self._job_id) def find_local_job(self): if not self._ipl_file: return None data_dir = self.data_dir() def is_candidate(p): return os.path.isdir( os.path.join(data_dir, p, "lib", "ipl-log-analysis") ) and os.path.isfile(os.path.join(data_dir, p, "model.ipl")) job_dirs = list( filter( is_candidate, os.listdir(data_dir), ) ) for d in job_dirs: model_file = os.path.join(data_dir, d, "model.ipl") if filecmp.cmp(model_file, self._ipl_file): return d return None def job_archive_path(self): return os.path.join(self.data_dir(), "{}.{}".format(self._job_id, "tar.gz")) def job_model_path(self): return os.path.join(self.job_data_dir(), "ipl", "gen", "model") def prepare_job_data(self): data_dir = self.data_dir() if not os.path.exists(data_dir): print(f"Creating directory {data_dir}...") os.mkdir(data_dir) job_archive_path = self.job_archive_path() if not os.path.exists(job_archive_path): response = api.ipl.data(self._auth, self._job_id) with open(job_archive_path, "wb") as data_file: data_file.write(response["content"]) job_data_dir = self.job_data_dir() if not os.path.exists(job_data_dir): print(f"Extracting job data in {job_data_dir}...") os.mkdir(job_data_dir) tar = tarfile.open(job_archive_path, "r:gz") tar.extractall(job_data_dir) tar.close() response = api.ipl.data(self._auth, self._job_id, file="version") with open(f"{job_data_dir}/version", "wb") as version_file: version_file.write(response["content"]) ipl_file_copy = os.path.join(job_data_dir, "model.ipl") shutil.copyfile(src=self._ipl_file, dst=ipl_file_copy) def fetch_runner_binary(self, version): runner_dir = os.path.join(self.data_dir(), "log_analysis_runner") if not os.path.exists(runner_dir): os.mkdir(runner_dir) runner_filename = f"runner-static-bytecode-linux-{version}.tar.gz" runner_path = os.path.join(runner_dir, runner_filename) url = f"https://storage.googleapis.com/ipl-log-analysis-releases/{runner_filename}" request = urllib.request.Request(url) response = urllib.request.urlopen(request) content = response.read() with open(runner_path, "wb") as data_file: data_file.write(content) return runner_path def run_binary(self): job_data_dir = self.job_data_dir() with open(os.path.join(job_data_dir, "version"), "r") as version_file: job_runner_version = version_file.read().strip() try: runner_path = self.fetch_runner_binary(job_runner_version) except urllib.error.HTTPError as e: if e.code == 404: print( f"WARNING: runner binary version {job_runner_version} not found. Using latest version..." ) runner_path = self.fetch_runner_binary("latest") else: raise ValueError(e.read().decode("utf-8")) print("Extracting runner binary...") runner_tar = tarfile.open(runner_path, "r:gz") runner_tar.extractall(job_data_dir) runner_tar.close() installed_runner_path = os.path.join(job_data_dir, "bin", "ipl_log_analysis") run_args = [ f"--imandra-port={self._imandra_port}", "--fix-traces", self._traces_dir, ] if self._interactive: run_args.append("--interactive=true") if self._json_out: run_args.append(f"--json-out={self._json_out}") if self._sender_comp_id: run_args.append(f"--sender-comp-id={self._sender_comp_id}") run_args.append(f"--imandra-host={self._imandra_host}") run_cmd = [installed_runner_path] + run_args self.run_analysis_process(run_cmd) def runner_image(self): if self._runner_image: print(f"Using runner image {self._runner_image}...") return self._runner_image else: env = os.getenv("IMANDRA_ENV", "prod") if env == "dev": runner_image = "europe-west1-docker.pkg.dev/imandra-dev/imandra/ipl-log-analysis-runner" else: runner_image = "imandra/ipl-log-analysis-runner" with open( os.path.join(self.job_data_dir(), "version"), "r" ) as version_file: job_runner_version = version_file.read().strip() tagged_runner_image = f"{runner_image}:{job_runner_version}" pull_p = subprocess.run( ["docker", "pull", tagged_runner_image], stdout=sys.stdout, stderr=subprocess.STDOUT, ) if pull_p.returncode != 0: print( f"WARNING: runner image {runner_image} with tag {job_runner_version} not found. Using latest version..." ) tagged_runner_image = f"{runner_image}:latest" pull_p = subprocess.run( ["docker", "pull", tagged_runner_image], stdout=sys.stdout, stderr=subprocess.STDOUT, ) if pull_p.returncode != 0: print("Failed to pull runner image") return None return tagged_runner_image def run_docker(self): try: subprocess.run(["docker", "--version"], stdout=subprocess.DEVNULL) except FileNotFoundError as e: print(e) return runner_image = self.runner_image() if runner_image is None: sys.exit(1) runner_args_list = [ f"--imandra-port={self._imandra_port}", "--fix-traces ../traces", ] if self._interactive: runner_args_list.append("--interactive true") if self._json_out is not None: runner_args_list.append("--json-out=out.json") if self._sender_comp_id: runner_args_list.append(f"--sender-comp-id={self._sender_comp_id}") if not is_loopback(self._imandra_host): runner_args_list.append(f"--imandra-host={self._imandra_host}") else: runner_args_list.append("--imandra-host=host.docker.internal") runner_args = " ".join(runner_args_list) runner_script = f"cd ipl-log-analysis && sudo tar zxf plugin.tar.gz && ./bin/ipl_log_analysis {runner_args}" if self._json_out is not None: # --json-out needs output file to exist os.close(os.open(self._json_out, os.O_CREAT)) runner_script = "touch ipl-log-analysis/out.json && " + runner_script mount_points = [ "-v", f"{self.job_archive_path()}:/home/ocaml/ipl-log-analysis/plugin.tar.gz", "-v", f"{self._traces_dir}:/home/ocaml/traces", ] if self._json_out is not None: mount_points = mount_points + [ "-v", f"{self._json_out}:/home/ocaml/ipl-log-analysis/out.json", ] docker_args = mount_points + [ "--platform=linux/amd64", "--rm", runner_image, "sh", "-uexc", runner_script, ] if self._interactive: docker_cmd = ["docker", "run"] + ["-i"] + docker_args else: docker_cmd = ["docker", "run"] + docker_args self.run_analysis_process(docker_cmd) def run_analysis_process(self, cmd): input_buffer = sys.stdin output_buffer = sys.stdout proc = subprocess.Popen( cmd, encoding="utf-8", stdin=subprocess.PIPE, stdout=output_buffer, universal_newlines=True, stderr=subprocess.STDOUT, ) if self._interactive: while True: try: inpt = input_buffer.readline() print(inpt, file=proc.stdin, flush=True) except BrokenPipeError: return else: while proc.poll() is None: time.sleep(0.5)
Methods
def data_dir(self)
-
Expand source code
def data_dir(self): return os.path.join(self._auth.folder_path, "data")
def fetch_runner_binary(self, version)
-
Expand source code
def fetch_runner_binary(self, version): runner_dir = os.path.join(self.data_dir(), "log_analysis_runner") if not os.path.exists(runner_dir): os.mkdir(runner_dir) runner_filename = f"runner-static-bytecode-linux-{version}.tar.gz" runner_path = os.path.join(runner_dir, runner_filename) url = f"https://storage.googleapis.com/ipl-log-analysis-releases/{runner_filename}" request = urllib.request.Request(url) response = urllib.request.urlopen(request) content = response.read() with open(runner_path, "wb") as data_file: data_file.write(content) return runner_path
def find_local_job(self)
-
Expand source code
def find_local_job(self): if not self._ipl_file: return None data_dir = self.data_dir() def is_candidate(p): return os.path.isdir( os.path.join(data_dir, p, "lib", "ipl-log-analysis") ) and os.path.isfile(os.path.join(data_dir, p, "model.ipl")) job_dirs = list( filter( is_candidate, os.listdir(data_dir), ) ) for d in job_dirs: model_file = os.path.join(data_dir, d, "model.ipl") if filecmp.cmp(model_file, self._ipl_file): return d return None
def job_archive_path(self)
-
Expand source code
def job_archive_path(self): return os.path.join(self.data_dir(), "{}.{}".format(self._job_id, "tar.gz"))
def job_data_dir(self)
-
Expand source code
def job_data_dir(self): return os.path.join(self.data_dir(), self._job_id)
def job_model_path(self)
-
Expand source code
def job_model_path(self): return os.path.join(self.job_data_dir(), "ipl", "gen", "model")
def login(self)
-
Expand source code
def login(self): self._auth.login()
def prepare_job_data(self)
-
Expand source code
def prepare_job_data(self): data_dir = self.data_dir() if not os.path.exists(data_dir): print(f"Creating directory {data_dir}...") os.mkdir(data_dir) job_archive_path = self.job_archive_path() if not os.path.exists(job_archive_path): response = api.ipl.data(self._auth, self._job_id) with open(job_archive_path, "wb") as data_file: data_file.write(response["content"]) job_data_dir = self.job_data_dir() if not os.path.exists(job_data_dir): print(f"Extracting job data in {job_data_dir}...") os.mkdir(job_data_dir) tar = tarfile.open(job_archive_path, "r:gz") tar.extractall(job_data_dir) tar.close() response = api.ipl.data(self._auth, self._job_id, file="version") with open(f"{job_data_dir}/version", "wb") as version_file: version_file.write(response["content"]) ipl_file_copy = os.path.join(job_data_dir, "model.ipl") shutil.copyfile(src=self._ipl_file, dst=ipl_file_copy)
def run_analysis_process(self, cmd)
-
Expand source code
def run_analysis_process(self, cmd): input_buffer = sys.stdin output_buffer = sys.stdout proc = subprocess.Popen( cmd, encoding="utf-8", stdin=subprocess.PIPE, stdout=output_buffer, universal_newlines=True, stderr=subprocess.STDOUT, ) if self._interactive: while True: try: inpt = input_buffer.readline() print(inpt, file=proc.stdin, flush=True) except BrokenPipeError: return else: while proc.poll() is None: time.sleep(0.5)
def run_binary(self)
-
Expand source code
def run_binary(self): job_data_dir = self.job_data_dir() with open(os.path.join(job_data_dir, "version"), "r") as version_file: job_runner_version = version_file.read().strip() try: runner_path = self.fetch_runner_binary(job_runner_version) except urllib.error.HTTPError as e: if e.code == 404: print( f"WARNING: runner binary version {job_runner_version} not found. Using latest version..." ) runner_path = self.fetch_runner_binary("latest") else: raise ValueError(e.read().decode("utf-8")) print("Extracting runner binary...") runner_tar = tarfile.open(runner_path, "r:gz") runner_tar.extractall(job_data_dir) runner_tar.close() installed_runner_path = os.path.join(job_data_dir, "bin", "ipl_log_analysis") run_args = [ f"--imandra-port={self._imandra_port}", "--fix-traces", self._traces_dir, ] if self._interactive: run_args.append("--interactive=true") if self._json_out: run_args.append(f"--json-out={self._json_out}") if self._sender_comp_id: run_args.append(f"--sender-comp-id={self._sender_comp_id}") run_args.append(f"--imandra-host={self._imandra_host}") run_cmd = [installed_runner_path] + run_args self.run_analysis_process(run_cmd)
def run_docker(self)
-
Expand source code
def run_docker(self): try: subprocess.run(["docker", "--version"], stdout=subprocess.DEVNULL) except FileNotFoundError as e: print(e) return runner_image = self.runner_image() if runner_image is None: sys.exit(1) runner_args_list = [ f"--imandra-port={self._imandra_port}", "--fix-traces ../traces", ] if self._interactive: runner_args_list.append("--interactive true") if self._json_out is not None: runner_args_list.append("--json-out=out.json") if self._sender_comp_id: runner_args_list.append(f"--sender-comp-id={self._sender_comp_id}") if not is_loopback(self._imandra_host): runner_args_list.append(f"--imandra-host={self._imandra_host}") else: runner_args_list.append("--imandra-host=host.docker.internal") runner_args = " ".join(runner_args_list) runner_script = f"cd ipl-log-analysis && sudo tar zxf plugin.tar.gz && ./bin/ipl_log_analysis {runner_args}" if self._json_out is not None: # --json-out needs output file to exist os.close(os.open(self._json_out, os.O_CREAT)) runner_script = "touch ipl-log-analysis/out.json && " + runner_script mount_points = [ "-v", f"{self.job_archive_path()}:/home/ocaml/ipl-log-analysis/plugin.tar.gz", "-v", f"{self._traces_dir}:/home/ocaml/traces", ] if self._json_out is not None: mount_points = mount_points + [ "-v", f"{self._json_out}:/home/ocaml/ipl-log-analysis/out.json", ] docker_args = mount_points + [ "--platform=linux/amd64", "--rm", runner_image, "sh", "-uexc", runner_script, ] if self._interactive: docker_cmd = ["docker", "run"] + ["-i"] + docker_args else: docker_cmd = ["docker", "run"] + docker_args self.run_analysis_process(docker_cmd)
def runner_image(self)
-
Expand source code
def runner_image(self): if self._runner_image: print(f"Using runner image {self._runner_image}...") return self._runner_image else: env = os.getenv("IMANDRA_ENV", "prod") if env == "dev": runner_image = "europe-west1-docker.pkg.dev/imandra-dev/imandra/ipl-log-analysis-runner" else: runner_image = "imandra/ipl-log-analysis-runner" with open( os.path.join(self.job_data_dir(), "version"), "r" ) as version_file: job_runner_version = version_file.read().strip() tagged_runner_image = f"{runner_image}:{job_runner_version}" pull_p = subprocess.run( ["docker", "pull", tagged_runner_image], stdout=sys.stdout, stderr=subprocess.STDOUT, ) if pull_p.returncode != 0: print( f"WARNING: runner image {runner_image} with tag {job_runner_version} not found. Using latest version..." ) tagged_runner_image = f"{runner_image}:latest" pull_p = subprocess.run( ["docker", "pull", tagged_runner_image], stdout=sys.stdout, stderr=subprocess.STDOUT, ) if pull_p.returncode != 0: print("Failed to pull runner image") return None return tagged_runner_image
def start_imandra(self, timeout)
-
Expand source code
def start_imandra(self, timeout): if self._imandra_host is not None or self._imandra_port is not None: self._imandra_host = ( self._imandra_host if self._imandra_host is not None else "localhost" ) self._imandra_port = ( self._imandra_port if self._imandra_port is not None else 3000 ) return True else: return self._start_imandra(timeout)
def submit_job(self)
-
Expand source code
def submit_job(self): job_id = api.ipl.log_analysis_builder( self._auth, self._ipl_file, self._organization, self._callback, self._decomp_job_id, ) self._job_id = job_id return job_id
def use_job(self, uuid)
-
Expand source code
def use_job(self, uuid): self._job_id = uuid