温馨提示:本文翻译自stackoverflow.com,查看原文请点击:apache kafka - F# async: parent/child cancellation?
apache-kafka confluent-kafka f# f#-async

apache kafka - F#异步:取消父/子?

发布于 2020-04-19 12:05:21

因此,我们开始:给定一个Confluent.Kafka IConsumer<>,它将其包装到专用的asyncCE中,并在不要求取消的情况下使用它。该代码段还针对OperationCancelledException和运行finally块进行防御以确保使用者的正常终止。

let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
    async {
            let! ct = Async.CancellationToken
            try
                try
                    while not ct.IsCancellationRequested do 
                        let consumeResult = consumer.Consume(ct)
                        if not consumeResult.IsPartitionEOF then do! (callback consumeResult) 
                with
                | :? OperationCanceledException -> return ()
            finally
                consumer.Close()
                consumer.Dispose()
    }

问题1:此代码正确还是我在滥用async

到现在为止还挺好。在我的应用程序中,我必须处理许多必须完全死亡的消费者因此,假设consumers: seq<Async<unit>>代表他们,那么我想出了以下代码:

async {
        for consumer in consumers do 
            do! (Async.StartChild consumer |> Async.Ignore).
    }

我希望这段代码将子级链接到父级的取消上下文,并且一旦取消,子级也将被取消。

问题2:即使孩子被取消了,我的finally代码块是否也可以保证运行?

查看更多

提问者
Sereja Bogolubov
被浏览
46
Tomas Petricek 2020-02-05 05:23

关于您的代码,我有两个发现:

  • 您对的使用Async.StartChild是正确的-所有子计算都将继承相同的取消令牌,并且在取消主令牌时它们都将被取消。

  • 可以在调用之后consumer.Consume(ct)调用之前取消异步工作流callback我不确定这对您的特定问题意味着什么,但是如果它从队列中删除了一些数据,则在处理数据之前可能会丢失它们。如果这是一个问题,那么我认为您需要使callback非异步,或以不同的方式调用它。

  • 在您的consumeUntilCancelled函数中,您不需要显式检查,如果不是,ct.IsCancellationRequested则为true。异步工作流程会在do!或中自动执行此操作let!,因此您可以将其替换为一个while循环。

这是最小的独立演示:

let consume s = async {
  try
    while true do 
      do! Async.Sleep 1000
      printfn "%s did work" s
  finally
    printfn "%s finalized" s }

let work = 
  async {
    for c in ["A"; "B"; "C"; "D"] do  
      do! Async.StartChild (consume c) |> Async.Ignore }

现在,我们使用取消令牌创建计算:

// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)

// Run this sometime later
ct.Cancel()

一旦调用ct.Cancelfinally调用所有块,并且所有循环将停止。