Warm tip: This article is reproduced from stackoverflow.com, please click
apache-kafka confluent-kafka f# f#-async

F# async: parent/child cancellation?

发布于 2020-04-19 09:22:41

So here we go: given a Confluent.Kafka IConsumer<>, it wraps it into a dedicated async CE and consumes as long as cancellation hasn't been requested. This piece of code is also defends itself against the OperationCancelledException and runs finally block to ensure graceful termination of consumer.

let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
    async {
            let! ct = Async.CancellationToken
            try
                try
                    while not ct.IsCancellationRequested do 
                        let consumeResult = consumer.Consume(ct)
                        if not consumeResult.IsPartitionEOF then do! (callback consumeResult) 
                with
                | :? OperationCanceledException -> return ()
            finally
                consumer.Close()
                consumer.Dispose()
    }

Question #1: is this code correct or am I abusing the async?

So far so good. In my app I have to deal with lots of consumers that must die altogether. So, assuming that consumers: seq<Async<unit>> represents them, the following code is what I came up with:

async {
        for consumer in consumers do 
            do! (Async.StartChild consumer |> Async.Ignore).
    }

I expect this code to chain childs to the parent's cancellation context, and once it is cancelled, childs gonna be cancelled as well.

Question #2: is my finally block guaranteed to be ran even though child got cancelled?

Questioner
Sereja Bogolubov
Viewed
46
Tomas Petricek 2020-02-05 05:23

I have two observations about your code:

  • Your use of Async.StartChild is correct - all child computations will inherit the same cancellation token and they will all get cancelled when the main token is cancelled.

  • The async workflow can be cancelled after you call consumer.Consume(ct) and before you call callback. I'm not sure what this means for your specific problem, but if it removes some data from a queue, the data could be lost before it is processed. If that's an issue, then I think you'll need to make callback non-asynchronous, or invoke it differently.

  • In your consumeUntilCancelled function, you do not explicity need to check while not if ct.IsCancellationRequested is true. The async workflow does this automatically in every do! or let!, so you can replace this with just a while loop.

Here is a minimal stand-alone demo:

let consume s = async {
  try
    while true do 
      do! Async.Sleep 1000
      printfn "%s did work" s
  finally
    printfn "%s finalized" s }

let work = 
  async {
    for c in ["A"; "B"; "C"; "D"] do  
      do! Async.StartChild (consume c) |> Async.Ignore }

Now we create the computation with a cancellation token:

// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)

// Run this sometime later
ct.Cancel()

Once you call ct.Cancel, all the finally blocks will be called and all the loops will stop.