use std::collections::HashMap;
use std::mem;
use std::ops::RangeInclusive;
use indexmap::IndexMap;
use result_like::impl_option_like;
use crate::exceptions::PyBaseExceptionRef;
use crate::obj::objtuple::PyTuple;
use crate::obj::objtype::{isinstance, PyClassRef};
use crate::pyobject::{
IntoPyObject, PyObjectRef, PyRef, PyResult, PyValue, TryFromObject, TypeProtocol,
};
use crate::vm::VirtualMachine;
use self::OptionalArg::*;
#[derive(Debug, Default, Clone)]
pub struct PyFuncArgs {
pub args: Vec<PyObjectRef>,
pub kwargs: IndexMap<String, PyObjectRef>,
}
impl From<Vec<PyObjectRef>> for PyFuncArgs {
fn from(args: Vec<PyObjectRef>) -> Self {
PyFuncArgs {
args,
kwargs: IndexMap::new(),
}
}
}
impl From<PyObjectRef> for PyFuncArgs {
fn from(arg: PyObjectRef) -> Self {
PyFuncArgs {
args: vec![arg],
kwargs: IndexMap::new(),
}
}
}
impl From<(&Args, &KwArgs)> for PyFuncArgs {
fn from(arg: (&Args, &KwArgs)) -> Self {
let Args(args) = arg.0;
let KwArgs(kwargs) = arg.1;
PyFuncArgs {
args: args.clone(),
kwargs: kwargs.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
}
}
}
impl FromArgs for PyFuncArgs {
fn from_args(_vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
Ok(mem::replace(args, Default::default()))
}
}
impl PyFuncArgs {
pub fn new(mut args: Vec<PyObjectRef>, kwarg_names: Vec<String>) -> PyFuncArgs {
let kwarg_values = args.drain((args.len() - kwarg_names.len())..);
let mut kwargs = IndexMap::new();
for (name, value) in kwarg_names.iter().zip(kwarg_values) {
kwargs.insert(name.clone(), value);
}
PyFuncArgs { args, kwargs }
}
pub fn insert(&self, item: PyObjectRef) -> PyFuncArgs {
let mut args = PyFuncArgs {
args: self.args.clone(),
kwargs: self.kwargs.clone(),
};
args.args.insert(0, item);
args
}
pub fn shift(&mut self) -> PyObjectRef {
self.args.remove(0)
}
pub fn get_kwarg(&self, key: &str, default: PyObjectRef) -> PyObjectRef {
self.kwargs
.get(key)
.cloned()
.unwrap_or_else(|| default.clone())
}
pub fn get_optional_kwarg(&self, key: &str) -> Option<PyObjectRef> {
self.kwargs.get(key).cloned()
}
pub fn get_optional_kwarg_with_type(
&self,
key: &str,
ty: PyClassRef,
vm: &VirtualMachine,
) -> PyResult<Option<PyObjectRef>> {
match self.get_optional_kwarg(key) {
Some(kwarg) => {
if isinstance(&kwarg, &ty) {
Ok(Some(kwarg))
} else {
let expected_ty_name = vm.to_pystr(&ty)?;
let actual_ty_name = vm.to_pystr(&kwarg.class())?;
Err(vm.new_type_error(format!(
"argument of type {} is required for named parameter `{}` (got: {})",
expected_ty_name, key, actual_ty_name
)))
}
}
None => Ok(None),
}
}
pub fn take_positional(&mut self) -> Option<PyObjectRef> {
if self.args.is_empty() {
None
} else {
Some(self.args.remove(0))
}
}
pub fn take_positional_keyword(&mut self, name: &str) -> Option<PyObjectRef> {
self.take_positional().or_else(|| self.take_keyword(name))
}
pub fn take_keyword(&mut self, name: &str) -> Option<PyObjectRef> {
self.kwargs.swap_remove(name)
}
pub fn remaining_keywords<'a>(
&'a mut self,
) -> impl Iterator<Item = (String, PyObjectRef)> + 'a {
self.kwargs.drain(..)
}
pub fn bind<T: FromArgs>(mut self, vm: &VirtualMachine) -> PyResult<T> {
let given_args = self.args.len();
let bound = match T::from_args(vm, &mut self) {
Ok(args) => args,
Err(ArgumentError::TooFewArgs) => {
return Err(vm.new_type_error(format!(
"Expected at least {} arguments ({} given)",
T::arity().start(),
given_args,
)));
}
Err(ArgumentError::TooManyArgs) => {
return Err(vm.new_type_error(format!(
"Expected at most {} arguments ({} given)",
T::arity().end(),
given_args,
)));
}
Err(ArgumentError::InvalidKeywordArgument(name)) => {
return Err(vm.new_type_error(format!("{} is an invalid keyword argument", name)));
}
Err(ArgumentError::RequiredKeywordArgument(name)) => {
return Err(vm.new_type_error(format!("Required keyqord only argument {}", name)));
}
Err(ArgumentError::Exception(ex)) => {
return Err(ex);
}
};
if !self.args.is_empty() {
Err(vm.new_type_error(format!(
"Expected at most {} arguments ({} given)",
T::arity().end(),
given_args,
)))
} else if !self.kwargs.is_empty() {
Err(vm.new_type_error(format!(
"Unexpected keyword argument {}",
self.kwargs.keys().next().unwrap()
)))
} else {
Ok(bound)
}
}
}
pub enum ArgumentError {
TooFewArgs,
TooManyArgs,
InvalidKeywordArgument(String),
RequiredKeywordArgument(String),
Exception(PyBaseExceptionRef),
}
impl From<PyBaseExceptionRef> for ArgumentError {
fn from(ex: PyBaseExceptionRef) -> Self {
ArgumentError::Exception(ex)
}
}
pub trait FromArgs: Sized {
fn arity() -> RangeInclusive<usize> {
0..=0
}
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError>;
}
pub struct KwArgs<T = PyObjectRef>(HashMap<String, T>);
impl<T> KwArgs<T> {
pub fn pop_kwarg(&mut self, name: &str) -> Option<T> {
self.0.remove(name)
}
}
impl<T> FromArgs for KwArgs<T>
where
T: TryFromObject,
{
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
let mut kwargs = HashMap::new();
for (name, value) in args.remaining_keywords() {
kwargs.insert(name, T::try_from_object(vm, value)?);
}
Ok(KwArgs(kwargs))
}
}
impl<T> IntoIterator for KwArgs<T> {
type Item = (String, T);
type IntoIter = std::collections::hash_map::IntoIter<String, T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Clone)]
pub struct Args<T = PyObjectRef>(Vec<T>);
impl<T> Args<T> {
pub fn into_vec(self) -> Vec<T> {
self.0
}
}
impl<T: PyValue> Args<PyRef<T>> {
pub fn into_tuple(self, vm: &VirtualMachine) -> PyObjectRef {
vm.ctx
.new_tuple(self.0.into_iter().map(PyRef::into_object).collect())
}
}
impl<T> FromArgs for Args<T>
where
T: TryFromObject,
{
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
let mut varargs = Vec::new();
while let Some(value) = args.take_positional() {
varargs.push(T::try_from_object(vm, value)?);
}
Ok(Args(varargs))
}
}
impl<T> IntoIterator for Args<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<T> FromArgs for T
where
T: TryFromObject,
{
fn arity() -> RangeInclusive<usize> {
1..=1
}
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
if let Some(value) = args.take_positional() {
Ok(T::try_from_object(vm, value)?)
} else {
Err(ArgumentError::TooFewArgs)
}
}
}
#[derive(Debug, is_macro::Is)]
pub enum OptionalArg<T = PyObjectRef> {
Present(T),
Missing,
}
impl_option_like!(OptionalArg, Present, Missing);
pub type OptionalOption<T> = OptionalArg<Option<T>>;
impl<T> OptionalOption<T> {
#[inline]
pub fn flat_option(self) -> Option<T> {
match self {
Present(Some(value)) => Some(value),
_ => None,
}
}
}
impl<T> FromArgs for OptionalArg<T>
where
T: TryFromObject,
{
fn arity() -> RangeInclusive<usize> {
0..=1
}
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
if let Some(value) = args.take_positional() {
Ok(Present(T::try_from_object(vm, value)?))
} else {
Ok(Missing)
}
}
}
impl FromArgs for () {
fn from_args(_vm: &VirtualMachine, _args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
Ok(())
}
}
macro_rules! tuple_from_py_func_args {
($($T:ident),+) => {
impl<$($T),+> FromArgs for ($($T,)+)
where
$($T: FromArgs),+
{
fn arity() -> RangeInclusive<usize> {
let mut min = 0;
let mut max = 0;
$(
let (start, end) = $T::arity().into_inner();
min += start;
max += end;
)+
min..=max
}
fn from_args(vm: &VirtualMachine, args: &mut PyFuncArgs) -> Result<Self, ArgumentError> {
Ok(($($T::from_args(vm, args)?,)+))
}
}
};
}
tuple_from_py_func_args!(A);
tuple_from_py_func_args!(A, B);
tuple_from_py_func_args!(A, B, C);
tuple_from_py_func_args!(A, B, C, D);
tuple_from_py_func_args!(A, B, C, D, E);
tuple_from_py_func_args!(A, B, C, D, E, F);
pub type PyNativeFunc = Box<dyn Fn(&VirtualMachine, PyFuncArgs) -> PyResult + 'static>;
pub trait IntoPyNativeFunc<T, R, VM> {
fn into_func(self) -> PyNativeFunc;
}
impl<F> IntoPyNativeFunc<PyFuncArgs, PyResult, VirtualMachine> for F
where
F: Fn(&VirtualMachine, PyFuncArgs) -> PyResult + 'static,
{
fn into_func(self) -> PyNativeFunc {
Box::new(self)
}
}
pub struct OwnedParam<T>(std::marker::PhantomData<T>);
pub struct RefParam<T>(std::marker::PhantomData<T>);
macro_rules! into_py_native_func_tuple {
($(($n:tt, $T:ident)),*) => {
impl<F, $($T,)* R> IntoPyNativeFunc<($(OwnedParam<$T>,)*), R, VirtualMachine> for F
where
F: Fn($($T,)* &VirtualMachine) -> R + 'static,
$($T: FromArgs,)*
R: IntoPyObject,
{
fn into_func(self) -> PyNativeFunc {
Box::new(move |vm, args| {
let ($($n,)*) = args.bind::<($($T,)*)>(vm)?;
(self)($($n,)* vm).into_pyobject(vm)
})
}
}
impl<F, S, $($T,)* R> IntoPyNativeFunc<(RefParam<S>, $(OwnedParam<$T>,)*), R, VirtualMachine> for F
where
F: Fn(&S, $($T,)* &VirtualMachine) -> R + 'static,
S: PyValue,
$($T: FromArgs,)*
R: IntoPyObject,
{
fn into_func(self) -> PyNativeFunc {
Box::new(move |vm, args| {
let (zelf, $($n,)*) = args.bind::<(PyRef<S>, $($T,)*)>(vm)?;
(self)(&zelf, $($n,)* vm).into_pyobject(vm)
})
}
}
impl<F, $($T,)* R> IntoPyNativeFunc<($(OwnedParam<$T>,)*), R, ()> for F
where
F: Fn($($T,)*) -> R + 'static,
$($T: FromArgs,)*
R: IntoPyObject,
{
fn into_func(self) -> PyNativeFunc {
IntoPyNativeFunc::into_func(move |$($n,)* _vm: &VirtualMachine| (self)($($n,)*))
}
}
impl<F, S, $($T,)* R> IntoPyNativeFunc<(RefParam<S>, $(OwnedParam<$T>,)*), R, ()> for F
where
F: Fn(&S, $($T,)*) -> R + 'static,
S: PyValue,
$($T: FromArgs,)*
R: IntoPyObject,
{
fn into_func(self) -> PyNativeFunc {
IntoPyNativeFunc::into_func(move |zelf: &S, $($n,)* _vm: &VirtualMachine| (self)(zelf, $($n,)*))
}
}
};
}
into_py_native_func_tuple!();
into_py_native_func_tuple!((a, A));
into_py_native_func_tuple!((a, A), (b, B));
into_py_native_func_tuple!((a, A), (b, B), (c, C));
into_py_native_func_tuple!((a, A), (b, B), (c, C), (d, D));
into_py_native_func_tuple!((a, A), (b, B), (c, C), (d, D), (e, E));
pub fn single_or_tuple_any<T: PyValue, F: Fn(PyRef<T>) -> PyResult<bool>>(
obj: PyObjectRef,
predicate: F,
message: fn(&PyObjectRef) -> String,
vm: &VirtualMachine,
) -> PyResult<bool> {
use std::marker::PhantomData;
struct Checker<'vm, T: PyValue, F: Fn(PyRef<T>) -> PyResult<bool>> {
predicate: F,
message: fn(&PyObjectRef) -> String,
vm: &'vm VirtualMachine,
t: PhantomData<T>,
}
impl<T: PyValue, F: Fn(PyRef<T>) -> PyResult<bool>> Checker<'_, T, F> {
fn check(&self, obj: PyObjectRef) -> PyResult<bool> {
match_class!(match obj {
obj @ T => (self.predicate)(obj),
tuple @ PyTuple => {
for obj in tuple.as_slice().iter() {
if self.check(obj.clone())? {
return Ok(true);
}
}
Ok(false)
}
obj => Err(self.vm.new_type_error((self.message)(&obj))),
})
}
}
let checker = Checker {
predicate,
message,
vm,
t: PhantomData,
};
checker.check(obj)
}