use std::borrow::Borrow;
use std::cell::{Cell, Ref, RefCell};
use std::collections::hash_map::HashMap;
use std::collections::hash_set::HashSet;
use std::fmt;
use std::rc::Rc;
use std::sync::{Mutex, MutexGuard};
use arr_macro::arr;
use num_bigint::BigInt;
use num_traits::ToPrimitive;
use once_cell::sync::Lazy;
#[cfg(feature = "rustpython-compiler")]
use rustpython_compiler::{compile, error::CompileError};
use crate::builtins::{self, to_ascii};
use crate::bytecode;
use crate::exceptions::{PyBaseException, PyBaseExceptionRef};
use crate::frame::{ExecutionResult, Frame, FrameRef};
use crate::frozen;
use crate::function::{OptionalArg, PyFuncArgs};
use crate::import;
use crate::obj::objbool;
use crate::obj::objcode::{PyCode, PyCodeRef};
use crate::obj::objdict::PyDictRef;
use crate::obj::objint::PyInt;
use crate::obj::objiter;
use crate::obj::objlist::PyList;
use crate::obj::objmodule::{self, PyModule};
use crate::obj::objobject;
use crate::obj::objstr::{PyString, PyStringRef};
use crate::obj::objtuple::PyTuple;
use crate::obj::objtype::{self, PyClassRef};
use crate::pyhash;
use crate::pyobject::{
IdProtocol, ItemProtocol, PyContext, PyObject, PyObjectRef, PyResult, PyValue, TryFromObject,
TryIntoRef, TypeProtocol,
};
use crate::scope::Scope;
use crate::stdlib;
use crate::sysmodule;
pub struct VirtualMachine {
pub builtins: PyObjectRef,
pub sys_module: PyObjectRef,
pub stdlib_inits: RefCell<HashMap<String, stdlib::StdlibInitFunc>>,
pub ctx: PyContext,
pub frames: RefCell<Vec<FrameRef>>,
pub wasm_id: Option<String>,
pub exceptions: RefCell<Vec<PyBaseExceptionRef>>,
pub frozen: RefCell<HashMap<String, bytecode::FrozenModule>>,
pub import_func: RefCell<PyObjectRef>,
pub profile_func: RefCell<PyObjectRef>,
pub trace_func: RefCell<PyObjectRef>,
pub use_tracing: RefCell<bool>,
pub signal_handlers: RefCell<[PyObjectRef; NSIG]>,
pub settings: PySettings,
pub recursion_limit: Cell<usize>,
pub codec_registry: RefCell<Vec<PyObjectRef>>,
pub initialized: bool,
}
pub const NSIG: usize = 64;
#[derive(Copy, Clone)]
pub enum InitParameter {
NoInitialize,
InitializeInternal,
InitializeExternal,
}
pub struct PySettings {
pub debug: bool,
pub inspect: bool,
pub optimize: u8,
pub no_user_site: bool,
pub no_site: bool,
pub ignore_environment: bool,
pub verbose: u8,
pub quiet: bool,
pub dont_write_bytecode: bool,
pub path_list: Vec<String>,
pub argv: Vec<String>,
pub initialization_parameter: InitParameter,
}
enum TraceEvent {
Call,
Return,
}
impl fmt::Display for TraceEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use TraceEvent::*;
match self {
Call => write!(f, "call"),
Return => write!(f, "return"),
}
}
}
impl Default for PySettings {
fn default() -> Self {
PySettings {
debug: false,
inspect: false,
optimize: 0,
no_user_site: false,
no_site: false,
ignore_environment: false,
verbose: 0,
quiet: false,
dont_write_bytecode: false,
path_list: vec![],
argv: vec![],
initialization_parameter: InitParameter::InitializeExternal,
}
}
}
impl VirtualMachine {
pub fn new(settings: PySettings) -> VirtualMachine {
flame_guard!("new VirtualMachine");
let ctx = PyContext::new();
let new_module =
|dict| PyObject::new(PyModule {}, ctx.types.module_type.clone(), Some(dict));
let builtins_dict = ctx.new_dict();
let builtins = new_module(builtins_dict.clone());
let sysmod_dict = ctx.new_dict();
let sysmod = new_module(sysmod_dict.clone());
let stdlib_inits = RefCell::new(stdlib::get_module_inits());
let frozen = RefCell::new(frozen::get_module_inits());
let import_func = RefCell::new(ctx.none());
let profile_func = RefCell::new(ctx.none());
let trace_func = RefCell::new(ctx.none());
let signal_handlers = RefCell::new(arr![ctx.none(); 64]);
let initialize_parameter = settings.initialization_parameter;
let mut vm = VirtualMachine {
builtins: builtins.clone(),
sys_module: sysmod.clone(),
stdlib_inits,
ctx,
frames: RefCell::new(vec![]),
wasm_id: None,
exceptions: RefCell::new(vec![]),
frozen,
import_func,
profile_func,
trace_func,
use_tracing: RefCell::new(false),
signal_handlers,
settings,
recursion_limit: Cell::new(512),
codec_registry: RefCell::default(),
initialized: false,
};
objmodule::init_module_dict(
&vm,
&builtins_dict,
vm.new_str("builtins".to_owned()),
vm.get_none(),
);
objmodule::init_module_dict(
&vm,
&sysmod_dict,
vm.new_str("sys".to_owned()),
vm.get_none(),
);
vm.initialize(initialize_parameter);
vm
}
pub fn initialize(&mut self, initialize_parameter: InitParameter) {
flame_guard!("init VirtualMachine");
match initialize_parameter {
InitParameter::NoInitialize => {}
_ => {
if self.initialized {
panic!("Double Initialize Error");
}
builtins::make_module(self, self.builtins.clone());
sysmodule::make_module(self, self.sys_module.clone(), self.builtins.clone());
#[cfg(not(target_arch = "wasm32"))]
import::import_builtin(self, "signal").expect("Couldn't initialize signal module");
import::init_importlib(self, initialize_parameter)
.expect("Initialize importlib fail");
self.initialized = true;
}
}
}
pub fn run_code_obj(&self, code: PyCodeRef, scope: Scope) -> PyResult {
let frame = Frame::new(code, scope).into_ref(self);
self.run_frame_full(frame)
}
pub fn run_frame_full(&self, frame: FrameRef) -> PyResult {
match self.run_frame(frame)? {
ExecutionResult::Return(value) => Ok(value),
_ => panic!("Got unexpected result from function"),
}
}
pub fn run_frame(&self, frame: FrameRef) -> PyResult<ExecutionResult> {
self.check_recursive_call("")?;
self.frames.borrow_mut().push(frame.clone());
let result = frame.run(self);
self.frames.borrow_mut().pop();
result
}
fn check_recursive_call(&self, _where: &str) -> PyResult<()> {
if self.frames.borrow().len() > self.recursion_limit.get() {
Err(self.new_recursion_error(format!("maximum recursion depth exceeded {}", _where)))
} else {
Ok(())
}
}
pub fn current_frame(&self) -> Option<Ref<FrameRef>> {
let frames = self.frames.borrow();
if frames.is_empty() {
None
} else {
Some(Ref::map(self.frames.borrow(), |frames| {
frames.last().unwrap()
}))
}
}
pub fn current_scope(&self) -> Ref<Scope> {
let frame = self
.current_frame()
.expect("called current_scope but no frames on the stack");
Ref::map(frame, |f| &f.scope)
}
pub fn try_class(&self, module: &str, class: &str) -> PyResult<PyClassRef> {
let class = self
.get_attribute(self.import(module, &[], 0)?, class)?
.downcast()
.expect("not a class");
Ok(class)
}
pub fn class(&self, module: &str, class: &str) -> PyClassRef {
let module = self
.import(module, &[], 0)
.unwrap_or_else(|_| panic!("unable to import {}", module));
let class = self
.get_attribute(module.clone(), class)
.unwrap_or_else(|_| panic!("module {} has no class {}", module, class));
class.downcast().expect("not a class")
}
pub fn new_str(&self, s: String) -> PyObjectRef {
self.ctx.new_str(s)
}
#[inline]
pub fn new_int<T: Into<BigInt> + ToPrimitive>(&self, i: T) -> PyObjectRef {
self.ctx.new_int(i)
}
#[inline]
pub fn new_bool(&self, b: bool) -> PyObjectRef {
self.ctx.new_bool(b)
}
pub fn new_module(&self, name: &str, dict: PyDictRef) -> PyObjectRef {
objmodule::init_module_dict(self, &dict, self.new_str(name.to_owned()), self.get_none());
PyObject::new(PyModule {}, self.ctx.types.module_type.clone(), Some(dict))
}
pub fn new_exception(
&self,
exc_type: PyClassRef,
args: Vec<PyObjectRef>,
) -> PyBaseExceptionRef {
vm_trace!("New exception created: {}", exc_type.name);
PyBaseException::new(args, self)
.into_ref_with_type_unchecked(exc_type, Some(self.ctx.new_dict()))
}
pub fn new_exception_empty(&self, exc_type: PyClassRef) -> PyBaseExceptionRef {
self.new_exception(exc_type, vec![])
}
pub fn new_exception_msg(&self, exc_type: PyClassRef, msg: String) -> PyBaseExceptionRef {
self.new_exception(exc_type, vec![self.new_str(msg)])
}
pub fn new_lookup_error(&self, msg: String) -> PyBaseExceptionRef {
let lookup_error = self.ctx.exceptions.lookup_error.clone();
self.new_exception_msg(lookup_error, msg)
}
pub fn new_attribute_error(&self, msg: String) -> PyBaseExceptionRef {
let attribute_error = self.ctx.exceptions.attribute_error.clone();
self.new_exception_msg(attribute_error, msg)
}
pub fn new_type_error(&self, msg: String) -> PyBaseExceptionRef {
let type_error = self.ctx.exceptions.type_error.clone();
self.new_exception_msg(type_error, msg)
}
pub fn new_name_error(&self, msg: String) -> PyBaseExceptionRef {
let name_error = self.ctx.exceptions.name_error.clone();
self.new_exception_msg(name_error, msg)
}
pub fn new_unsupported_operand_error(
&self,
a: PyObjectRef,
b: PyObjectRef,
op: &str,
) -> PyBaseExceptionRef {
self.new_type_error(format!(
"Unsupported operand types for '{}': '{}' and '{}'",
op,
a.class().name,
b.class().name
))
}
pub fn new_os_error(&self, msg: String) -> PyBaseExceptionRef {
let os_error = self.ctx.exceptions.os_error.clone();
self.new_exception_msg(os_error, msg)
}
pub fn new_unicode_decode_error(&self, msg: String) -> PyBaseExceptionRef {
let unicode_decode_error = self.ctx.exceptions.unicode_decode_error.clone();
self.new_exception_msg(unicode_decode_error, msg)
}
pub fn new_unicode_encode_error(&self, msg: String) -> PyBaseExceptionRef {
let unicode_encode_error = self.ctx.exceptions.unicode_encode_error.clone();
self.new_exception_msg(unicode_encode_error, msg)
}
pub fn new_value_error(&self, msg: String) -> PyBaseExceptionRef {
let value_error = self.ctx.exceptions.value_error.clone();
self.new_exception_msg(value_error, msg)
}
pub fn new_key_error(&self, obj: PyObjectRef) -> PyBaseExceptionRef {
let key_error = self.ctx.exceptions.key_error.clone();
self.new_exception(key_error, vec![obj])
}
pub fn new_index_error(&self, msg: String) -> PyBaseExceptionRef {
let index_error = self.ctx.exceptions.index_error.clone();
self.new_exception_msg(index_error, msg)
}
pub fn new_not_implemented_error(&self, msg: String) -> PyBaseExceptionRef {
let not_implemented_error = self.ctx.exceptions.not_implemented_error.clone();
self.new_exception_msg(not_implemented_error, msg)
}
pub fn new_recursion_error(&self, msg: String) -> PyBaseExceptionRef {
let recursion_error = self.ctx.exceptions.recursion_error.clone();
self.new_exception_msg(recursion_error, msg)
}
pub fn new_zero_division_error(&self, msg: String) -> PyBaseExceptionRef {
let zero_division_error = self.ctx.exceptions.zero_division_error.clone();
self.new_exception_msg(zero_division_error, msg)
}
pub fn new_overflow_error(&self, msg: String) -> PyBaseExceptionRef {
let overflow_error = self.ctx.exceptions.overflow_error.clone();
self.new_exception_msg(overflow_error, msg)
}
#[cfg(feature = "rustpython-compiler")]
pub fn new_syntax_error(&self, error: &CompileError) -> PyBaseExceptionRef {
let syntax_error_type = if error.is_indentation_error() {
self.ctx.exceptions.indentation_error.clone()
} else if error.is_tab_error() {
self.ctx.exceptions.tab_error.clone()
} else {
self.ctx.exceptions.syntax_error.clone()
};
let syntax_error = self.new_exception_msg(syntax_error_type, error.to_string());
let lineno = self.new_int(error.location.row());
let offset = self.new_int(error.location.column());
self.set_attr(syntax_error.as_object(), "lineno", lineno)
.unwrap();
self.set_attr(syntax_error.as_object(), "offset", offset)
.unwrap();
if let Some(v) = error.statement.as_ref() {
self.set_attr(syntax_error.as_object(), "text", self.new_str(v.to_owned()))
.unwrap();
}
if let Some(path) = error.source_path.as_ref() {
self.set_attr(
syntax_error.as_object(),
"filename",
self.new_str(path.to_owned()),
)
.unwrap();
}
syntax_error
}
pub fn new_import_error(&self, msg: String) -> PyBaseExceptionRef {
let import_error = self.ctx.exceptions.import_error.clone();
self.new_exception_msg(import_error, msg)
}
pub fn new_scope_with_builtins(&self) -> Scope {
Scope::with_builtins(None, self.ctx.new_dict(), self)
}
pub fn get_none(&self) -> PyObjectRef {
self.ctx.none()
}
pub fn is_none(&self, obj: &PyObjectRef) -> bool {
obj.is(&self.get_none())
}
pub fn get_type(&self) -> PyClassRef {
self.ctx.type_type()
}
pub fn get_object(&self) -> PyClassRef {
self.ctx.object()
}
pub fn get_locals(&self) -> PyDictRef {
self.current_scope().get_locals()
}
pub fn context(&self) -> &PyContext {
&self.ctx
}
pub fn to_str(&self, obj: &PyObjectRef) -> PyResult<PyStringRef> {
if obj.class().is(&self.ctx.types.str_type) {
Ok(obj.clone().downcast().unwrap())
} else {
let s = self.call_method(&obj, "__str__", vec![])?;
PyStringRef::try_from_object(self, s)
}
}
pub fn to_pystr<'a, T: Into<&'a PyObjectRef>>(&'a self, obj: T) -> PyResult<String> {
let py_str_obj = self.to_str(obj.into())?;
Ok(py_str_obj.as_str().to_owned())
}
pub fn to_repr(&self, obj: &PyObjectRef) -> PyResult<PyStringRef> {
let repr = self.call_method(obj, "__repr__", vec![])?;
TryFromObject::try_from_object(self, repr)
}
pub fn to_ascii(&self, obj: &PyObjectRef) -> PyResult {
let repr = self.call_method(obj, "__repr__", vec![])?;
let repr: PyStringRef = TryFromObject::try_from_object(self, repr)?;
let ascii = to_ascii(repr.as_str());
Ok(self.new_str(ascii))
}
pub fn import(&self, module: &str, from_list: &[String], level: usize) -> PyResult {
let weird = module.contains('.') || level != 0 || !from_list.is_empty();
let cached_module = if weird {
None
} else {
let sys_modules = self.get_attribute(self.sys_module.clone(), "modules")?;
sys_modules.get_item(module, self).ok()
};
match cached_module {
Some(module) => Ok(module),
None => {
let import_func = self
.get_attribute(self.builtins.clone(), "__import__")
.map_err(|_| self.new_import_error("__import__ not found".to_owned()))?;
let (locals, globals) = if let Some(frame) = self.current_frame() {
(
frame.scope.get_locals().into_object(),
frame.scope.globals.clone().into_object(),
)
} else {
(self.get_none(), self.get_none())
};
let from_list = self.ctx.new_tuple(
from_list
.iter()
.map(|name| self.new_str(name.to_owned()))
.collect(),
);
self.invoke(
&import_func,
vec![
self.new_str(module.to_owned()),
globals,
locals,
from_list,
self.ctx.new_int(level),
],
)
.map_err(|exc| import::remove_importlib_frames(self, &exc))
}
}
}
pub fn isinstance(&self, obj: &PyObjectRef, cls: &PyClassRef) -> PyResult<bool> {
if Rc::ptr_eq(&obj.class().into_object(), cls.as_object()) {
Ok(true)
} else {
let ret = self.call_method(cls.as_object(), "__instancecheck__", vec![obj.clone()])?;
objbool::boolval(self, ret)
}
}
pub fn issubclass(&self, subclass: &PyClassRef, cls: &PyClassRef) -> PyResult<bool> {
let ret = self.call_method(
cls.as_object(),
"__subclasscheck__",
vec![subclass.clone().into_object()],
)?;
objbool::boolval(self, ret)
}
pub fn call_get_descriptor(&self, descr: PyObjectRef, obj: PyObjectRef) -> Option<PyResult> {
let descr_class = descr.class();
let slots = descr_class.slots.borrow();
Some(if let Some(descr_get) = slots.borrow().descr_get.as_ref() {
let cls = obj.class();
descr_get(
self,
descr,
Some(obj.clone()),
OptionalArg::Present(cls.into_object()),
)
} else if let Some(ref descriptor) = descr_class.get_attr("__get__") {
let cls = obj.class();
self.invoke(descriptor, vec![descr, obj.clone(), cls.into_object()])
} else {
return None;
})
}
pub fn call_if_get_descriptor(&self, attr: PyObjectRef, obj: PyObjectRef) -> PyResult {
self.call_get_descriptor(attr.clone(), obj)
.unwrap_or(Ok(attr))
}
pub fn call_method<T>(&self, obj: &PyObjectRef, method_name: &str, args: T) -> PyResult
where
T: Into<PyFuncArgs>,
{
flame_guard!(format!("call_method({:?})", method_name));
let cls = obj.class();
match cls.get_attr(method_name) {
Some(func) => {
vm_trace!(
"vm.call_method {:?} {:?} {:?} -> {:?}",
obj,
cls,
method_name,
func
);
let wrapped = self.call_if_get_descriptor(func, obj.clone())?;
self.invoke(&wrapped, args)
}
None => Err(self.new_type_error(format!("Unsupported method: {}", method_name))),
}
}
fn _invoke(&self, callable: &PyObjectRef, args: PyFuncArgs) -> PyResult {
vm_trace!("Invoke: {:?} {:?}", callable, args);
let class = callable.class();
let slots = class.slots.borrow();
if let Some(slot_call) = slots.borrow().call.as_ref() {
self.trace_event(TraceEvent::Call)?;
let args = args.insert(callable.clone());
let result = slot_call(self, args);
self.trace_event(TraceEvent::Return)?;
result
} else if class.has_attr("__call__") {
let result = self.call_method(&callable, "__call__", args);
result
} else {
Err(self.new_type_error(format!(
"'{}' object is not callable",
callable.class().name
)))
}
}
#[inline]
pub fn invoke<T>(&self, func_ref: &PyObjectRef, args: T) -> PyResult
where
T: Into<PyFuncArgs>,
{
let res = self._invoke(func_ref, args.into());
res
}
fn trace_event(&self, event: TraceEvent) -> PyResult<()> {
if *self.use_tracing.borrow() {
let frame = self.get_none();
let event = self.new_str(event.to_string());
let arg = self.get_none();
let args = vec![frame, event, arg];
let trace_func = self.trace_func.borrow().clone();
if !self.is_none(&trace_func) {
self.use_tracing.replace(false);
let res = self.invoke(&trace_func, args.clone());
self.use_tracing.replace(true);
res?;
}
let profile_func = self.profile_func.borrow().clone();
if !self.is_none(&profile_func) {
self.use_tracing.replace(false);
let res = self.invoke(&profile_func, args);
self.use_tracing.replace(true);
res?;
}
}
Ok(())
}
pub fn extract_elements<T: TryFromObject>(&self, value: &PyObjectRef) -> PyResult<Vec<T>> {
let cls = value.class();
if cls.is(&self.ctx.tuple_type()) {
value
.payload::<PyTuple>()
.unwrap()
.as_slice()
.iter()
.map(|obj| T::try_from_object(self, obj.clone()))
.collect()
} else if cls.is(&self.ctx.list_type()) {
value
.payload::<PyList>()
.unwrap()
.borrow_elements()
.iter()
.map(|obj| T::try_from_object(self, obj.clone()))
.collect()
} else {
let iter = objiter::get_iter(self, value)?;
objiter::get_all(self, &iter)
}
}
#[cfg_attr(feature = "flame-it", flame("VirtualMachine"))]
pub fn get_attribute<T>(&self, obj: PyObjectRef, attr_name: T) -> PyResult
where
T: TryIntoRef<PyString>,
{
let attr_name = attr_name.try_into_ref(self)?;
vm_trace!("vm.__getattribute__: {:?} {:?}", obj, attr_name);
self.call_method(&obj, "__getattribute__", vec![attr_name.into_object()])
}
pub fn set_attr<K, V>(&self, obj: &PyObjectRef, attr_name: K, attr_value: V) -> PyResult
where
K: TryIntoRef<PyString>,
V: Into<PyObjectRef>,
{
let attr_name = attr_name.try_into_ref(self)?;
self.call_method(
obj,
"__setattr__",
vec![attr_name.into_object(), attr_value.into()],
)
}
pub fn del_attr(&self, obj: &PyObjectRef, attr_name: PyObjectRef) -> PyResult<()> {
self.call_method(&obj, "__delattr__", vec![attr_name])?;
Ok(())
}
pub fn get_method_or_type_error<F>(
&self,
obj: PyObjectRef,
method_name: &str,
err_msg: F,
) -> PyResult
where
F: FnOnce() -> String,
{
let cls = obj.class();
match cls.get_attr(method_name) {
Some(method) => self.call_if_get_descriptor(method, obj.clone()),
None => Err(self.new_type_error(err_msg())),
}
}
pub fn get_method(&self, obj: PyObjectRef, method_name: &str) -> Option<PyResult> {
let cls = obj.class();
let method = cls.get_attr(method_name)?;
Some(self.call_if_get_descriptor(method, obj.clone()))
}
pub fn call_or_unsupported<F>(
&self,
obj: PyObjectRef,
arg: PyObjectRef,
method: &str,
unsupported: F,
) -> PyResult
where
F: Fn(&VirtualMachine, PyObjectRef, PyObjectRef) -> PyResult,
{
if let Some(method_or_err) = self.get_method(obj.clone(), method) {
let method = method_or_err?;
let result = self.invoke(&method, vec![arg.clone()])?;
if !result.is(&self.ctx.not_implemented()) {
return Ok(result);
}
}
unsupported(self, obj, arg)
}
pub fn call_or_reflection(
&self,
lhs: PyObjectRef,
rhs: PyObjectRef,
default: &str,
reflection: &str,
unsupported: fn(&VirtualMachine, PyObjectRef, PyObjectRef) -> PyResult,
) -> PyResult {
self.call_or_unsupported(lhs, rhs, default, move |vm, lhs, rhs| {
vm.call_or_unsupported(rhs, lhs, reflection, unsupported)
})
}
pub fn generic_getattribute(
&self,
obj: PyObjectRef,
name_str: PyStringRef,
) -> PyResult<Option<PyObjectRef>> {
let name = name_str.as_str();
let cls = obj.class();
if let Some(attr) = cls.get_attr(&name) {
let attr_class = attr.class();
if attr_class.has_attr("__set__") {
if let Some(r) = self.call_get_descriptor(attr, obj.clone()) {
return r.map(Some);
}
}
}
let attr = if let Some(ref dict) = obj.dict {
dict.borrow().get_item_option(name_str.as_str(), self)?
} else {
None
};
if let Some(obj_attr) = attr {
Ok(Some(obj_attr))
} else if let Some(attr) = cls.get_attr(&name) {
self.call_if_get_descriptor(attr, obj).map(Some)
} else if let Some(getter) = cls.get_attr("__getattr__") {
self.invoke(&getter, vec![obj, name_str.into_object()])
.map(Some)
} else {
Ok(None)
}
}
pub fn is_callable(&self, obj: &PyObjectRef) -> bool {
obj.class().slots.borrow().call.is_some() || obj.class().has_attr("__call__")
}
#[inline]
pub fn check_signals(&self) -> PyResult<()> {
#[cfg(not(target_arch = "wasm32"))]
{
crate::stdlib::signal::check_signals(self)
}
#[cfg(target_arch = "wasm32")]
{
Ok(())
}
}
#[cfg(feature = "rustpython-compiler")]
pub fn compile(
&self,
source: &str,
mode: compile::Mode,
source_path: String,
) -> Result<PyCodeRef, CompileError> {
compile::compile(source, mode, source_path, self.settings.optimize)
.map(|codeobj| PyCode::new(codeobj).into_ref(self))
.map_err(|mut compile_error| {
compile_error.update_statement_info(source.trim_end().to_owned());
compile_error
})
}
fn call_codec_func(
&self,
func: &str,
obj: PyObjectRef,
encoding: Option<PyStringRef>,
errors: Option<PyStringRef>,
) -> PyResult {
let codecsmodule = self.import("_codecs", &[], 0)?;
let func = self.get_attribute(codecsmodule, func)?;
let mut args = vec![
obj,
encoding.map_or_else(|| self.get_none(), |s| s.into_object()),
];
if let Some(errors) = errors {
args.push(errors.into_object());
}
self.invoke(&func, args)
}
pub fn decode(
&self,
obj: PyObjectRef,
encoding: Option<PyStringRef>,
errors: Option<PyStringRef>,
) -> PyResult {
self.call_codec_func("decode", obj, encoding, errors)
}
pub fn encode(
&self,
obj: PyObjectRef,
encoding: Option<PyStringRef>,
errors: Option<PyStringRef>,
) -> PyResult {
self.call_codec_func("encode", obj, encoding, errors)
}
pub fn _sub(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__sub__", "__rsub__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "-"))
})
}
pub fn _isub(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__isub__", |vm, a, b| {
vm.call_or_reflection(a, b, "__sub__", "__rsub__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "-="))
})
})
}
pub fn _add(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__add__", "__radd__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "+"))
})
}
pub fn _iadd(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__iadd__", |vm, a, b| {
vm.call_or_reflection(a, b, "__add__", "__radd__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "+="))
})
})
}
pub fn _mul(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__mul__", "__rmul__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "*"))
})
}
pub fn _imul(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__imul__", |vm, a, b| {
vm.call_or_reflection(a, b, "__mul__", "__rmul__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "*="))
})
})
}
pub fn _matmul(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__matmul__", "__rmatmul__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "@"))
})
}
pub fn _imatmul(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__imatmul__", |vm, a, b| {
vm.call_or_reflection(a, b, "__matmul__", "__rmatmul__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "@="))
})
})
}
pub fn _truediv(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__truediv__", "__rtruediv__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "/"))
})
}
pub fn _itruediv(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__itruediv__", |vm, a, b| {
vm.call_or_reflection(a, b, "__truediv__", "__rtruediv__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "/="))
})
})
}
pub fn _floordiv(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__floordiv__", "__rfloordiv__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "//"))
})
}
pub fn _ifloordiv(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__ifloordiv__", |vm, a, b| {
vm.call_or_reflection(a, b, "__floordiv__", "__rfloordiv__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "//="))
})
})
}
pub fn _pow(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__pow__", "__rpow__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "**"))
})
}
pub fn _ipow(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__ipow__", |vm, a, b| {
vm.call_or_reflection(a, b, "__pow__", "__rpow__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "**="))
})
})
}
pub fn _mod(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__mod__", "__rmod__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "%"))
})
}
pub fn _imod(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__imod__", |vm, a, b| {
vm.call_or_reflection(a, b, "__mod__", "__rmod__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "%="))
})
})
}
pub fn _lshift(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__lshift__", "__rlshift__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "<<"))
})
}
pub fn _ilshift(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__ilshift__", |vm, a, b| {
vm.call_or_reflection(a, b, "__lshift__", "__rlshift__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "<<="))
})
})
}
pub fn _rshift(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__rshift__", "__rrshift__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, ">>"))
})
}
pub fn _irshift(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__irshift__", |vm, a, b| {
vm.call_or_reflection(a, b, "__rshift__", "__rrshift__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, ">>="))
})
})
}
pub fn _xor(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__xor__", "__rxor__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "^"))
})
}
pub fn _ixor(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__ixor__", |vm, a, b| {
vm.call_or_reflection(a, b, "__xor__", "__rxor__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "^="))
})
})
}
pub fn _or(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__or__", "__ror__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "|"))
})
}
pub fn _ior(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__ior__", |vm, a, b| {
vm.call_or_reflection(a, b, "__or__", "__ror__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "|="))
})
})
}
pub fn _and(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_reflection(a, b, "__and__", "__rand__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "&"))
})
}
pub fn _iand(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self.call_or_unsupported(a, b, "__iand__", |vm, a, b| {
vm.call_or_reflection(a, b, "__and__", "__rand__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "&="))
})
})
}
fn _cmp<F>(
&self,
v: PyObjectRef,
w: PyObjectRef,
op: &str,
swap_op: &str,
default: F,
) -> PyResult
where
F: Fn(&VirtualMachine, PyObjectRef, PyObjectRef) -> PyResult,
{
let mut checked_reverse_op = false;
if !v.typ.is(&w.typ) && objtype::issubclass(&w.class(), &v.class()) {
if let Some(method_or_err) = self.get_method(w.clone(), swap_op) {
let method = method_or_err?;
checked_reverse_op = true;
let result = self.invoke(&method, vec![v.clone()])?;
if !result.is(&self.ctx.not_implemented()) {
return Ok(result);
}
}
}
self.call_or_unsupported(v, w, op, |vm, v, w| {
if !checked_reverse_op {
self.call_or_unsupported(w, v, swap_op, |vm, v, w| default(vm, v, w))
} else {
default(vm, v, w)
}
})
}
pub fn _eq(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__eq__", "__eq__", |vm, a, b| {
Ok(vm.new_bool(a.is(&b)))
})
}
pub fn _ne(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__ne__", "__ne__", |vm, a, b| {
Ok(vm.new_bool(!a.is(&b)))
})
}
pub fn _lt(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__lt__", "__gt__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "<"))
})
}
pub fn _le(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__le__", "__ge__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, "<="))
})
}
pub fn _gt(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__gt__", "__lt__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, ">"))
})
}
pub fn _ge(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult {
self._cmp(a, b, "__ge__", "__le__", |vm, a, b| {
Err(vm.new_unsupported_operand_error(a, b, ">="))
})
}
pub fn _hash(&self, obj: &PyObjectRef) -> PyResult<pyhash::PyHash> {
let hash_obj = self.call_method(obj, "__hash__", vec![])?;
if let Some(hash_value) = hash_obj.payload_if_subclass::<PyInt>(self) {
Ok(hash_value.hash())
} else {
Err(self.new_type_error("__hash__ method should return an integer".to_owned()))
}
}
fn _membership_iter_search(&self, haystack: PyObjectRef, needle: PyObjectRef) -> PyResult {
let iter = objiter::get_iter(self, &haystack)?;
loop {
if let Some(element) = objiter::get_next_object(self, &iter)? {
if self.bool_eq(needle.clone(), element.clone())? {
return Ok(self.new_bool(true));
} else {
continue;
}
} else {
return Ok(self.new_bool(false));
}
}
}
pub fn _membership(&self, haystack: PyObjectRef, needle: PyObjectRef) -> PyResult {
if let Some(method_or_err) = self.get_method(haystack.clone(), "__contains__") {
let method = method_or_err?;
self.invoke(&method, vec![needle])
} else {
self._membership_iter_search(haystack, needle)
}
}
pub fn push_exception(&self, exc: PyBaseExceptionRef) {
self.exceptions.borrow_mut().push(exc)
}
pub fn pop_exception(&self) -> Option<PyBaseExceptionRef> {
self.exceptions.borrow_mut().pop()
}
pub fn current_exception(&self) -> Option<PyBaseExceptionRef> {
self.exceptions.borrow().last().cloned()
}
pub fn bool_eq(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult<bool> {
let eq = self._eq(a, b)?;
let value = objbool::boolval(self, eq)?;
Ok(value)
}
pub fn identical_or_equal(&self, a: &PyObjectRef, b: &PyObjectRef) -> PyResult<bool> {
if a.is(b) {
Ok(true)
} else {
self.bool_eq(a.clone(), b.clone())
}
}
pub fn bool_seq_lt(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult<Option<bool>> {
let value = if objbool::boolval(self, self._lt(a.clone(), b.clone())?)? {
Some(true)
} else if !objbool::boolval(self, self._eq(a.clone(), b.clone())?)? {
Some(false)
} else {
None
};
Ok(value)
}
pub fn bool_seq_gt(&self, a: PyObjectRef, b: PyObjectRef) -> PyResult<Option<bool>> {
let value = if objbool::boolval(self, self._gt(a.clone(), b.clone())?)? {
Some(true)
} else if !objbool::boolval(self, self._eq(a.clone(), b.clone())?)? {
Some(false)
} else {
None
};
Ok(value)
}
#[doc(hidden)]
pub fn __module_set_attr(
&self,
module: &PyObjectRef,
attr_name: impl TryIntoRef<PyString>,
attr_value: impl Into<PyObjectRef>,
) -> PyResult<()> {
let val = attr_value.into();
objobject::setattr(module.clone(), attr_name.try_into_ref(self)?, val, self)
}
}
impl Default for VirtualMachine {
fn default() -> Self {
VirtualMachine::new(Default::default())
}
}
static REPR_GUARDS: Lazy<Mutex<HashSet<usize>>> = Lazy::new(Mutex::default);
pub struct ReprGuard {
id: usize,
}
impl ReprGuard {
fn get_guards<'a>() -> MutexGuard<'a, HashSet<usize>> {
REPR_GUARDS.lock().expect("ReprGuard lock poisoned")
}
pub fn enter(obj: &PyObjectRef) -> Option<ReprGuard> {
let mut guards = ReprGuard::get_guards();
let id = obj.get_id();
if guards.contains(&id) {
return None;
}
guards.insert(id);
Some(ReprGuard { id })
}
}
impl Drop for ReprGuard {
fn drop(&mut self) {
ReprGuard::get_guards().remove(&self.id);
}
}
#[cfg(test)]
mod tests {
use super::VirtualMachine;
use crate::obj::{objint, objstr};
use num_bigint::ToBigInt;
#[test]
fn test_add_py_integers() {
let vm: VirtualMachine = Default::default();
let a = vm.ctx.new_int(33_i32);
let b = vm.ctx.new_int(12_i32);
let res = vm._add(a, b).unwrap();
let value = objint::get_value(&res);
assert_eq!(*value, 45_i32.to_bigint().unwrap());
}
#[test]
fn test_multiply_str() {
let vm: VirtualMachine = Default::default();
let a = vm.ctx.new_str(String::from("Hello "));
let b = vm.ctx.new_int(4_i32);
let res = vm._mul(a, b).unwrap();
let value = objstr::borrow_value(&res);
assert_eq!(value, String::from("Hello Hello Hello Hello "))
}
}