FTN12: FutoIn Async API Version: 1.11 Date: 2018-02-02 Copyright: 2014-2018 FutoIn Project (http://futoin.org) Authors: Andrey Galkin
Mutex
and Throttle
Limiter
primitiveThis interface was born as a secondary option for executor concept. However, it quickly became clear that async/reactor/proactor/light threads/etc. should be base for scalable high performance server implementations, even though it is more difficult for understanding and/or debugging. Traditional synchronous program flow becomes an addon on top of asynchronous base for legacy code and/or too complex logic.
Program flow is split into non-blocking execution steps, represented with execution callback function. Processing Unit (eg. CPU) halting/ spinning/switching-to-another-task is seen as a blocking action in program flow.
Any step must not call any of blocking functions, except for synchronization with guaranteed minimal period of lock acquisition. Note: under minimal period, it is assumed that any acquired lock is immediately released after action with O(1) complexity and no delay caused by programmatic suspension/locking of executing task
Every step is executed sequentially. Success result of any step becomes input for the following step.
Each step can have own error handler. Error handler is called, if AsyncSteps.error() is called within step execution or any of its sub-steps. Typical behavior is to ignore error and continue or to make cleanup actions and complete job with error.
Each step can have own sequence of sub-steps. Sub-steps can be added only during that step execution. Sub-step sequence is executed after current step execution is finished.
If there are any sub-steps added then current step must not call AsyncSteps.success() or AsyncSteps.error(). Otherwise, InternalError is raised.
It is possible to create a special "parallel" sub-step and add independent sub-steps to it. Execution of each parallel sub-step is started all together. Parallel step completes with success when all sub-steps complete with success. If error is raised in any sub-step of parallel step then all other sub-steps are canceled.
Out-of-order cancel of execution can occur by timeout, execution control engine decision (e.g. Invoker disconnect) or failure of sibling parallel step. Each step can install custom on-cancel handler to free resources and/or cancel external jobs. After cancel, it must be safe to destroy AsyncSteps object.
AsyncSteps must be used in Executor request processing. The same [root] AsyncSteps object must be used for all asynchronous tasks within given request processing.
AsyncSteps may be used by Invoker implementation.
AsyncSteps must support derived classes in implementation-defined way. Typical use case: functionality extension (e.g. request processing API).
For performance reasons, it is not economical to initialize AsyncSteps with business logic every time. Every implementation must support platform-specific AsyncSteps cloning/duplicating.
When AsyncSteps (or derived) object is created all steps are added sequentially in Level 0 through add() and/or parallel(). Note: each parallel() is seen as a step.
After AsyncSteps execution is initiated, each step of Level 0 is executed. All sub-steps are added in Level n+1. Example:
add() -> Level 0 #1
add() -> Level 1 #1
add() -> Level 2 #1
parallel() -> Level 2 #2
add() -> Level 2 #3
parallel() -> Level 1 #2
add() -> Level 1 #3
parallel() -> Level 0 #2
add() -> Level 0 #3
Execution cannot continue to the next step of current Level until all steps of higher Level are executed.
The execution sequence would be:
Level 0 add #1
Level 1 add #1
Level 2 add #1
Level 2 parallel #2
Level 2 add #3
Level 1 parallel #2
Level 1 add #3
Level 0 parallel #2
Level 0 add #3
Due to not linear programming, classic try/catch blocks are converted into execute/onerror. Each added step may have custom error handler. If error handler is not specified then control passed to lower Level error handler. If non is defined then execution is aborted.
Example:
add( -> Level 0
func( as ){
print( "Level 0 func" )
add( -> Level 1
func( as ){
print( "Level 1 func" )
as.error( "myerror" )
},
onerror( as, error ){
print( "Level 1 onerror: " + error )
as.error( "newerror" )
}
)
},
onerror( as, error ){
print( "Level 0 onerror: " + error )
as.success( "Prm" )
}
)
add( -> Level 0
func( as, param ){
print( "Level 0 func2: " + param )
as.success()
}
)
Output would be:
Level 0 func
Level 1 func
Level 1 onerror: myerror
Level 0 onerror: newerror
Level 0 func2: Prm
In synchronous way, it would look like:
variable = null
try
{
print( "Level 0 func" )
try
{
print( "Level 1 func" )
throw "myerror"
}
catch ( error )
{
print( "Level 1 onerror: " + error )
throw "newerror"
}
}
catch( error )
{
print( "Level 0 onerror: " + error )
variable = "Prm"
}
print( "Level 0 func2: " + variable )
Very often, error handler creates an alternative complex program path which
requires own async operation. Therefore, error handler must accept as.add()
as implicit as.success()
.
If steps are added inside error handler they must remain on the same async stack level while error handler itself gets removed.
Example:
add( -> Level 0
func( as ){
print( "Level 0 func" )
add( -> Level 1
func( as ){
print( "Level 1 func" )
as.error( "first" )
},
onerror( as, error ){
print( "Level 1 onerror: " + error )
as.add( -> Level 2
func() {
print( "Level 2 func" )
as.error( "second" );
},
onerror( as, error ) {
print( "Level 2 onerror: " + error )
}
)
}
)
},
onerror( as, error ){
print( "Level 0 onerror: " + error )
}
)
Output would be:
Level 0 func
Level 1 func
Level 1 onerror: first
Level 2 func
Level 2 onerror: second
Level 0 onerror: second
Note: "Level 1 onerror" is not executed second time!
Very often, execution of step cannot continue without waiting for external event like input from network or disk. It is forbidden to block execution in event waiting. As a solution, there are special setTimeout() and setCancel() methods.
Example:
add(
func( as ){
socket.read( function( data ){
as.success( data )
} )
as.setCancel( function(){
socket.cancel_read()
} )
as.setTimeout( 30_000 ) // 30 seconds
},
onerror( as, error ){
if ( error == timeout ) {
print( "Timeout" )
}
else
{
print( "Read Error" )
}
}
)
Definition of parallel steps makes no sense to continue execution if any of steps fails. To avoid excessive time and resources spent on other steps, there is a concept of canceling execution similar to timeout above.
Example:
as.parallel()
.add(
func( as ){
as.setCancel( function(){ ... } )
// do parallel job #1
as.state()->result1 = ...;
}
)
.add(
func( as ){
as.setCancel( function(){ ... } )
// do parallel job #1
as.state()->result2 = ...;
}
)
.add(
func( as ){
as.error( "Some Error" )
}
)
as.add(
func( as ){
print( as.state()->result1 + as.state->result2 )
as.success()
}
)
In long living applications the same business logic may be re-used multiple times during execution.
In a REST API server example, complex business logic can be defined only once and stored in a kind of AsyncSteps object repository. On each request, a reference object from the repository would be copied for actual processing with minimal overhead.
However, there would be no performance difference in sub-step definition unless its callback function is also created at initialization time, but not at parent step execution time (the default concept). So, it should be possible to predefine those as well and copy/inherit during step execution. Copying steps must also involve copying of state variables.
Example:
AsyncSteps req_repo_common;
req_repo_common.add(func( as ){
as.add( func( as ){ ... } );
as.copyFrom( as.state().business_logic );
as.add( func( as ){ ... } );
});
AsyncSteps req_repo_buslog1;
req_repo_buslog1
.add(func( as ){ ... })
.add(func( as ){ ... });
AsyncSteps actual_exec = copy req_repo_common;
actual_exec.state().business_logic = req_repo_buslog1;
actual_exec.execute();
However, this approach only make sense for deep performance optimizations.
If there are no sub-steps added, no timeout set and no cancel handler set then implicit as.success() call is assumed to simplify code and increase efficiency.
as.add(func( as ){
doSomeStuff( as );
})
As in many cases it's required to wait for external event without any additional
conditions, the general approach used to be adding an empty cancel handler. To
avoid that, an explicit .waitExternal()
API is available.
Pre-defined state variables:
Error code is not always descriptive enough, especially, if it can be generated in multiple ways. As a convention special "error_info" state field should hold descriptive information of the last error. Therefore, as.error() is extended with optional parameter error_info.
"last_exception" state variables may hold the last exception object caught, if feasible to implement. It should be populated with FutoIn errors as well.
Almost always, async program flow is not linear. Sometimes, loops are required.
Basic principals of async loops:
as.loop( func( as ){
call_some_library( as );
as.add( func( as, result ){
if ( !result )
{
// exit loop
as.break();
}
} );
} )
Inner loops and identifiers:
// start loop
as.loop(
func( as ){
as.loop( func( as ){
call_some_library( as );
as.add( func( as, result ){
if ( !result )
{
// exit loop
as.continue( "OUTER" );
}
as.success( result );
} );
} );
as.add( func( as, result ){
// use it somehow
as.success();
} );
},
"OUTER"
)
Loop n times.
as.repeat( 3, func( as, i ){
print( 'Iteration: ' + i )
} )
Traverse through list or map:
as.forEach(
[ 'apple', 'banana' ],
func( as, k, v ){
print( k + " = " + v )
}
)
Normal loop termination is performed either by loop condition (e.g. as.forEach(), as.repeat()) or by as.break() call. Normal termination is seen as as.success() call.
Abnormal termination is possible through as.error(), including timeout, or external as.cancel(). Abnormal termination is seen as as.error() call.
If any of API identifiers clashes with reserved word or has illegal symbols then implementation-defined name mangling is allowed, but with the following guidelines in priority.
Pre-defined alternative method names, if the default matches language-specific reserved keywords:
As with any multi-threaded application, multi-step cases may also require synchronization to ensure not more than N steps enter the same critical section.
Implemented as Mutex
class.
For general stability reasons and protection of self-DoS, it may be required to limit number of steps allowed to enter critical section within time period.
Implemented as Throttle
class.
A special .sync(obj, step, err_handler)
API is available to synchronize against
any object supporting synchronization protocol .sync(as, step, err_handler)
.
Synchronization object is allowed to add own steps and is responsible for adding request steps under protection of provided synchronization. Synchronization object must correctly handle canceled execution and possible errors.
Incoming success parameters must be passed to critical section step. Resulting success parameters must be forwarded to the following steps like there is no critical section logic.
All synchronization implementations must either allow multiple re-entrancy of the same AsyncSteps instance or properly detect and raise error on such event.
All implementations must correctly detect parallel flows in scope of single AsyncSteps instance and treat each as separate one. None of paralleled steps should inherit lock state of parent step.
Deadlock detection is optional and is not mandatory required.
It may be required to limit maximum number of pending AsyncSteps flows. If overall queue limit is reached then new entries must get "DefenseRejected" error.
Request processing stability requires to limit both simultaneous connections and
request rate. Therefore a special synchronization primitive Limiter
wrapping
Mutex
and Throttle
is introduced to impose limits in scope.
Sometimes, it's required to return a value after inner step are executed. It leads to code like:
value = 123;
as.add( subStep() );
as.add( (as) => as.success( value ) );
To optimize and make the code cleaner previously deprecated successStep()
is
returned. Example:
value = 123;
as.add( subStep() );
as.successStep( value );
As Promises and await
patterns become more and more popular in modern technologies,
AsyncSteps should support them through as.await(future_or_promise)
call.
Details of implementation is specific to particular technology. However, the following guidelines should be used:
future_or_promise
is cancellable then as.setCancel()
must be used.as.waitExternal()
to be used.as.error()
as.success()
void execute_callback( AsyncSteps as[, previous_success_args] )
as.success()
or as.error()
as.add()
and/or as.parallel()
as.setTimeout()
and/or
set cancel handler through as.setCancel()
as.error( InternalError )
. Not
applicable to implicit success.as.state()
for global current job state datavoid error_callback( AsyncSteps as, error )
as.error()
callas.success()
- continue execution from the next step, after returnas.error()
- change error stringas.error( InternalError )
void cancel_callback( AsyncSteps as )
interface ISync
void sync( AsyncSteps, execute_callback[, error_callback] )
It is assumed that all functions in this section are part of single AsyncSteps interface. However, they are grouped by semantical scope of use.
AsyncSteps add( execute_callback func[, error_callback onerror] )
AsyncSteps parallel( [error_callback onerror] )
success()
does not allow any arguments - use state()
to pass resultsMap state()
AsyncSteps copyFrom( AsyncSteps other )
AsyncSteps sync(ISync obj, execute_callback func[, error_callback onerror] )
AsyncSteps successStep( [result_arg, ...] )
as.add( (as) => as.success( result_arg, ... ) )
AsyncSteps await( future_or_promise[, error_callback onerror] )
Note: success()
and error()
can be used in error_callback as well
void success( [result_arg, ...] )
void error( name [, error_info] )
FutoIn.Error
exceptiononerror( async_iface, name )
after returning to execution engineerror_info
- assigned to "error_info" state fieldvoid setTimeout( timeout_ms )
call operator overloading
void setCancel( cancel_callback oncancel )
void waitExternal()
as.success()
behavior of current stepexecute()
- must be called only once after root object steps are configured.cancel()
- may be called on root object to asynchronously cancel executionexecute_callback
void loop( func, [, label] )
as.break()
is calledfunc( as )
- loop bodylabel
- optional label to use for as.break()
and as.continue()
in inner loopsvoid forEach( map|list, func [, label] )
map
or list
element call func( as, key, value )
func( as, key, value )
- loop bodylabel
- optional label to use for as.break()
and as.continue()
in inner loopsvoid repeat( count, func [, label] )
func(as, i)
for count
timescount
- how many times to call the func
func( as, i )
- loop body, i - current iteration starting from 0label
- optional label to use for as.break()
and as.continue()
in inner loopsvoid break( [label] )
label
- unwind loops, until label
named loop is exitedvoid continue( [label] )
label
- break loops, until label
named loop is foundMutex
classISync
interfacec-tor(unsigned integer max=1, unsigned integer max_queue=null)
max_queue
- optionally, limit queue lengthThrottle
classISync
interfacec-tor(unsigned integer max, unsigned integer period_ms=1000, unsigned integer max_queue=null)
period_ms
- time period in millisecondsmax_queue
- optionally, limit queue lengthLimiter
classISync
interfacec-tor(options)
options.concurrent=1
- maximum concurrent flowsoptions.max_queue=0
- maximum queuedoptions.rate=1
- maximum entries in periodoptions.period_ms=1000
- period lengthoptions.burst=0
- maximum queue for rate limitingIn pseudo-code.
AsyncStepsImpl as;
as.add(
function( inner_as ){
if ( something )
inner_as.success( 1, 2 )
else
inner_as.error( NotImplemented )
},
function( inner_as, error ){
externalError( error );
}
).add(
function( inner_as, res1, res2 ){
externalSuccess( res1, res2 );
},
)
AsyncStepsImpl as;
as.add(
function( inner_as ){
inner_as.add(
function( inner2_as ){
if ( something )
inner2_as.success( 1 )
else
inner2_as.error( NotImplemented )
},
function( inner2_as, error )
{
log( "Spotted error " + error )
// continue with higher level error handlers
}
)
inner_as.add(
function( inner2_as, res1 ){
inner2_as.success( res1, 2 )
}
)
},
function( inner_as, error ){
externalError( error );
}
).add(
function( inner_as, res1, res2 ){
externalSuccess( res1, res2 );
},
)
AsyncStepsImpl as;
as.add(
function( inner_as ){
inner_as.parallel().add(
function( inner2_as ){
inner2_as.state().parallel_1 = 1;
},
function( inner2_as, error )
{
log( "Spotted error " + error )
// continue with higher level error handlers
}
).add(
function( inner2_as ){
inner2_as.state().parallel_2 = 2;
},
function( inner2_as, error )
{
inner2_as.state().parallel_2 = 0;
// ignore error
}
)
},
function( inner_as, error ){
externalError( error );
}
).add(
function( inner_as, res1, res2 ){
externalSuccess(
as.state().parallel_1,
as.state().parallel_2
);
},
)
AsyncStepsImpl as;
as.add(
function( as ){
as.repeat( 3, function( as, i ) {
print i;
} );
as.forEach( [ 1, 3, 3 ], function( as, k, v ) {
print k "=" v;
} );
as.forEach( as.state(), function( as, k, v ) {
print k "=" v;
} );
},
)
AsyncStepsImpl as;
as.add(
function( as ){
as.waitExternal();
callSomeExternal( function(err) {
if (err)
{
try {
as.error(err);
} catch {
// ignore
}
}
else
{
as.success();
}
} );
},
)
AsyncStepsImpl as;
MutexImpl mutex(10);
as.sync(
mutex,
function( as ){
// critical section with regular AsyncSteps
},
)
=END OF SPEC=