分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

时间:2023-03-09 04:27:55
分享一个异步任务在遇到IO异常时支持递归回调的辅助方法
public void TryAsyncActionRecursively<TAsyncResult>(
string asyncActionName,
Func<Task<TAsyncResult>> asyncAction,
Action<int> mainAction,
Action<TAsyncResult> successAction,
Func<string> getContextInfoFunc,
Action<Exception> failedAction,
int retryTimes) where TAsyncResult : AsyncOperationResult
{
var retryAction = new Action<int>(currentRetryTimes =>
{
if (currentRetryTimes >= _immediatelyRetryTimes)
{
Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + ));
}
else
{
mainAction(currentRetryTimes + );
}
});
var executeFailedAction = new Action<Exception>(ex =>
{
try
{
if (failedAction != null)
{
failedAction(ex);
}
}
catch (Exception unknownEx)
{
_logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
asyncActionName, getContextInfoFunc()), unknownEx);
}
});
var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
{
if (ex is IOException)
{
_logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}",
asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
retryAction(retryTimes);
}
else
{
_logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}",
asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
executeFailedAction(ex);
}
});
var completeAction = new Action<Task<TAsyncResult>>(t =>
{
if (t.Exception != null)
{
processTaskException(t.Exception.InnerException, retryTimes);
return;
}
if (t.IsCanceled)
{
_logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}",
asyncActionName, getContextInfoFunc(), retryTimes);
retryAction(retryTimes);
return;
}
var result = t.Result;
if (result.Status == AsyncOperationResultStatus.IOException)
{
_logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}",
asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
retryAction(retryTimes);
return;
}
if (successAction != null)
{
successAction(result);
}
}); try
{
asyncAction().ContinueWith(completeAction);
}
catch (IOException ex)
{
_logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
asyncActionName, getContextInfoFunc(), retryTimes), ex);
retryAction(retryTimes);
}
catch (Exception ex)
{
_logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
asyncActionName, getContextInfoFunc(), retryTimes), ex);
executeFailedAction(ex);
}
}

该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

下面是使用示例:

private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
{
TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
() => _eventPublisher.PublishAsync(eventStream),
currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
result =>
{
_logger.DebugFormat("Publish events success, {0}", eventStream);
processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id));
},
() => string.Format("[eventStream:{0}]", eventStream),
ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)),
retryTimes);
}
PublishEventAsync(processingCommand, eventStream, );