"""
Parallel computations utility module
====================================
Defines low-level functions to deal with parallel computation tasks.
Warnings
--------
Those are low-level computation routines which are not user-friendly.
Usage must be thoroughly tested.
"""
import warnings
import threading
import _thread as thread
from concurrent.futures import TimeoutError
from pebble import ProcessExpired
from sympy.utilities.iterables import multiset_permutations
from layercake.utils.integration import symbolic_integration, numerical_integration
[docs]
def parallel_integration(pool, args_list, substitutions, destination, timeout, permute=False, symbolic_int=False):
"""Functions to integrate |Sympy| expressions, either symbolically or numerically, in parallel.
Parameters
----------
pool: pebble.ProcessPool
A Pebble pool of workers.
args_list: list(tuple)
A list of tuples with the following arguments for the integration subfunctions:
* `indices`: Tuple of integers labelling the integrations in the integration queue.
Will be returned by the worker.
* `integrals_definition`: A callable returning the integral(s) as a |Sympy| expression.
* `integrals_arguments`: A tuple with the arguments to be provided to the `integrals_definition` callable.
substitutions: list(tuple)
List of 2-tuples containing extra symbolic substitutions to be made at the end of the integral computation.
The 2-tuples contain first a |Sympy| expression and then the value to substitute.
destination: None or sparse.DOK or ~numpy.ndarray
Place where to store the output. If an array is provided, it will append the output of the integrations to it.
If `None`, it will create a new dictionary and return it.
If `symbolic_int` is `True`, then `destination` should be `None`.
timeout: None or bool or int
Control the switch from symbolic to numerical integration. By default, `parallel_integration` workers will try to integrate
|Sympy| expressions symbolically, but a fallback to numerical integration can be enforced.
The options are:
* `None`: This is the "full-symbolic" mode. No timeout will be applied, and the switch to numerical integration will never happen.
Can result in very long and improbable computation time.
* `True`: This is the "full-numerical" mode. Symbolic computations do not occur, and the workers try directly to integrate
numerically.
* `False`: Same as `None`.
* An integer: defines a timeout after which, if a symbolic integration have not completed, the worker switch to the
numerical integration.
permute: bool, optional
Permute the indices provided, except the first one, and return the result for all these indices.
Default to `False`.
symbolic_int: bool, optional
Force symbolic integration and do not substitute the substitutions at the end, making the output a list of |Sympy| expressions.
Default to `False`.
Returns
-------
tuple(2-tuple)
A list with the results, as 2-tuple with the labelling indices and the output of the integration, either as a float or as
a |Sympy| integration.
"""
if destination is None:
return_dict = True
destination = dict()
else:
return_dict = False
if timeout is False or symbolic_int:
timeout = None
if timeout is not True:
new_args_list = [tuple(list(args)+[substitutions]) for args in args_list]
future = pool.map(symbolic_integration, new_args_list, timeout=timeout)
results = future.result()
num_args_list = list()
i = 0
while True:
try:
res = next(results)
if symbolic_int:
expr = res[1].simplify()
if permute:
i = res[0][0]
idx = res[0][1:]
perm_idx = multiset_permutations(idx)
for perm in perm_idx:
idx = [i] + perm
destination[tuple(idx)] = expr
else:
destination[res[0]] = expr
else:
destination[res[0]] = float(res[1].subs(substitutions))
expr_subs = float(res[1].subs(substitutions))
if permute:
i = res[0][0]
idx = res[0][1:]
perm_idx = multiset_permutations(idx)
for perm in perm_idx:
idx = [i] + perm
destination[tuple(idx)] = expr_subs
else:
destination[res[0]] = expr_subs
except StopIteration:
break
except TimeoutError:
num_args_list.append(args_list[i] + [substitutions])
except ProcessExpired as e:
start = "\033[1m"
end = "\033[0;0m"
warnings.warn("An integration process expired. " + start +
"Be suspicious of the resulting inner products and tensors" + end + ".")
print("%s. Exit code: %d" % (e, e.exitcode))
except Exception as e:
start = "\033[1m"
end = "\033[0;0m"
warnings.warn("Something went wrong with an integration process. " + start +
"Be suspicious of the resulting inner products and tensors" + end + ".")
print("Function raised %s" % e)
print(e.traceback) # Python's traceback of remote process
i += 1
else:
num_args_list = [list(args) + [substitutions] for args in args_list]
future = pool.map(numerical_integration, num_args_list)
results = future.result()
if permute:
while True:
try:
res = next(results)
i = res[0][0]
idx = res[0][1:]
perm_idx = multiset_permutations(idx)
for perm in perm_idx:
idx = [i] + perm
destination[tuple(idx)] = res[1]
except StopIteration:
break
else:
while True:
try:
res = next(results)
destination[res[0]] = res[1]
except StopIteration:
break
if return_dict:
return destination
def _inner_product_arguments(args):
return args[1], args[3], args[4]._inner_product_arguments(args[0], args[1], args[2])
[docs]
def parallel_symbolic_evaluation(pool, indices_list, inner_product, basis, numerical, term):
"""Functions to evaluate |Sympy| inner products expressions inside tensors in parallel.
Parameters
----------
pool: concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor
A pool of workers.
indices_list: list(tuple)
A list of tuples with the indices of the tensor entries.
inner_product: callable
A callable defining the inner products.
basis: list(SymbolicBasis)
List of symbolic function basis on which each element of the term(s) inner products must be decomposed.
numerical: bool
Whether to compute numerical or symbolic inner products.
term: ArithmeticTerms
Arithmetic term(s) from which the inner products will be computed.
Returns
-------
tuple(2-tuple)
A list with the results, as 2-tuple with the labelling indices and the output of the evaluation.
"""
args_list_in = [(basis, indices, numerical, inner_product, term) for indices in indices_list]
return pool.map(_inner_product_arguments, args_list_in)
def _quit_function(fn_name):
thread.interrupt_main()
[docs]
def exit_after(s):
"""
use as decorator to exit process if
function takes longer than s seconds
"""
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, _quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer