因此,我们开始:给定一个Confluent.Kafka
IConsumer<>
,它将其包装到专用的async
CE中,并在不要求取消的情况下使用它。该代码段还针对OperationCancelledException
和运行finally
块进行防御,以确保使用者的正常终止。
let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
async {
let! ct = Async.CancellationToken
try
try
while not ct.IsCancellationRequested do
let consumeResult = consumer.Consume(ct)
if not consumeResult.IsPartitionEOF then do! (callback consumeResult)
with
| :? OperationCanceledException -> return ()
finally
consumer.Close()
consumer.Dispose()
}
问题1:此代码正确还是我在滥用async
?
到现在为止还挺好。在我的应用程序中,我必须处理许多必须完全死亡的消费者。因此,假设consumers: seq<Async<unit>>
代表他们,那么我想出了以下代码:
async {
for consumer in consumers do
do! (Async.StartChild consumer |> Async.Ignore).
}
我希望这段代码将子级链接到父级的取消上下文,并且一旦取消,子级也将被取消。
问题2:即使孩子被取消了,我的finally代码块是否也可以保证运行?
关于您的代码,我有两个发现:
您对的使用Async.StartChild
是正确的-所有子计算都将继承相同的取消令牌,并且在取消主令牌时它们都将被取消。
可以在调用之后consumer.Consume(ct)
和调用之前取消异步工作流callback
。我不确定这对您的特定问题意味着什么,但是如果它从队列中删除了一些数据,则在处理数据之前可能会丢失它们。如果这是一个问题,那么我认为您需要使callback
非异步,或以不同的方式调用它。
在您的consumeUntilCancelled
函数中,您不需要显式检查,如果不是,ct.IsCancellationRequested
则为true。异步工作流程会在do!
或中自动执行此操作let!
,因此您可以将其替换为一个while
循环。
这是最小的独立演示:
let consume s = async {
try
while true do
do! Async.Sleep 1000
printfn "%s did work" s
finally
printfn "%s finalized" s }
let work =
async {
for c in ["A"; "B"; "C"; "D"] do
do! Async.StartChild (consume c) |> Async.Ignore }
现在,我们使用取消令牌创建计算:
// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)
// Run this sometime later
ct.Cancel()
一旦调用ct.Cancel
,finally
将调用所有块,并且所有循环将停止。