Generic Structure

AsyncThrowingStream

An asynchronous sequence generated from an error-throwing closure that calls a continuation to produce new elements.

Declaration 宣言

struct AsyncThrowingStream<Element, Failure> where Failure : Error

Overview 概要

AsyncThrowingStream conforms to AsyncSequence, providing a convenient way to create an asynchronous sequence without manually implementing an asynchronous iterator. In particular, an asynchronous stream is well-suited to adapt callback- or delegation-based APIs to participate with async-await.

In contrast to AsyncStream, this type can throw an error from the awaited next(), which terminates the stream with the thrown error.

You initialize an AsyncThrowingStream with a closure that receives an AsyncThrowingStream.Continuation. Produce elements in this closure, then provide them to the stream by calling the continuation’s yield(_:) method. When there are no further elements to produce, call the continuation’s finish() method. This causes the sequence iterator to produce a nil, which terminates the sequence. If an error occurs, call the continuation’s finish(throwing:) method, which causes the iterator’s next() method to throw the error to the awaiting call point. The continuation is Sendable, which permits calling it from concurrent contexts external to the iteration of the AsyncThrowingStream.

An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. 要素いくつかからなるある随意のソースは、要素それらを、それらのすべてにわたって反復している呼び出し側によってそれらが消費されるよりも速く生み出すことがありえます。 Because of this, AsyncThrowingStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means it’s unbounded.

Adapting Existing Code to Use Streams 既存のコードを適応させてストリームを使うようにする

To adapt existing callback code to use async-await, use the callbacks to provide values to the stream, by using the continuation’s yield(_:) method. 既存のコールバックコードをasync-awaitを使うように適応させるには、コールバックそれらを使って値それらをストリームへと、継続のもつyield(_:)メソッドを使うことによって提供してください。

Consider a hypothetical QuakeMonitor type that provides callers with Quake instances every time it detects an earthquake. To receive callbacks, callers set a custom closure as the value of the monitor’s quakeHandler property, which the monitor calls back as necessary. ある仮定的なQuakeMonitor型を考えてください、それは呼び出し側それらにQuakeインスタンスそれらを、それがある地震を検出するたびごとに提供します。コールバックそれらを受け取るには、呼び出し側それらはあるあつらえのクロージャをモニタのもつquakeHandlerプロパティの値として設定します、それはモニタが必要に応じて折り返し呼び出しするものです。 Callers can also set an errorHandler to receive asychronous error notifications, such as the monitor service suddenly becoming unavailable.


class QuakeMonitor {
    var quakeHandler: ((Quake) -> Void)?
    var errorHandler: ((Error) -> Void)?


    func startMonitoring() {}
    func stopMonitoring() {}
}

To adapt this to use async-await, extend the QuakeMonitor to add a quakes property, of type AsyncThrowingStream<Quake>. In the getter for this property, return an AsyncThrowingStream, whose build closure – called at runtime to create the stream – uses the continuation to perform the following steps:

  1. Creates a QuakeMonitor instance. あるQuakeMonitorインスタンスを作成します。

  2. Sets the monitor’s quakeHandler property to a closure that receives each Quake instance and forwards it to the stream by calling the continuation’s yield(_:) method. モニタのもつquakeHandlerプロパティをあるクロージャへと設定します、それは各Quakeインスタンスを受け取って、そしてそれをストリームへと、継続のもつyield(_:)メソッドを呼び出すことによって転送します。

  3. Sets the monitor’s errorHandler property to a closure that receives any error from the monitor and forwards it to the stream by calling the continuation’s finish(throwing:) method. This causes the stream’s iterator to throw the error and terminate the stream.

  4. Sets the continuation’s onTermination property to a closure that calls stopMonitoring() on the monitor. 継続のもつonTerminationプロパティをあるクロージャへと設定します、それはstopMonitoring()をモニタ上で呼び出します。

  5. Calls startMonitoring on the QuakeMonitor. startMonitoringQuakeMonitor上で呼び出します。

    extension QuakeMonitor {

    
     static var throwingQuakes: AsyncThrowingStream<Quake, Error> {
         AsyncThrowingStream { continuation in
             let monitor = QuakeMonitor()
             monitor.quakeHandler = { quake in
                 continuation.yield(quake)
             }
             monitor.errorHandler = { error in
                 continuation.finish(throwing: error)
             }
             continuation.onTermination = { @Sendable _ in
                 monitor.stopMonitoring()
             }
             monitor.startMonitoring()
         }
     }

    }

Because the stream is an AsyncSequence, the call point uses the for-await-in syntax to process each Quake instance as produced by the stream:


do {
    for try await quake in quakeStream {
        print ("Quake: \(quake.date)")
    }
    print ("Stream done.")
} catch {
    print ("Error: \(error)")
}

Topics 話題

Creating a Continuation-Based Stream

Finding Elements 要素を見つける

Selecting Elements 要素の選択

Excluding Elements 要素を除外する

Transforming a Sequence シーケンスを変形する

Creating an Iterator イテレータを作成する

Supporting Types 支援を行う型

Initializers イニシャライザ

Relationships 関係

Conforms To 次に準拠

See Also 参照

Asynchronous Sequences 非同期シーケンス