Asynchone
Extensions and additions to AsyncSequence
, AsyncStream
and AsyncThrowingStream
.
Requirements
- iOS 14.0+
- macOS 12.0+
Installation
Swift Package Manager
In Xcode:
- Click
Project
. - Click
Package Dependencies
. - Click
+
. - Enter package URL:
https://github.com/reddavis/Asynchrone
. - Add
Asynchone
to your app target.
Documentation
Documentation can be found here.
Overview
AsyncSequence
AnyAsyncSequenceable
let sequence = Just(1)
.map(String.init)
.eraseToAnyAsyncSequenceable()
AnyThrowingAsyncSequenceable
let stream = Fail<Int, TestError>(error: TestError.a)
.eraseToAnyThrowingAsyncSequenceable()
CombineLatestAsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.combineLatest(streamB) {
print(value)
}
// Prints:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)
// (4, 9)
CombineLatest3AsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(10)
continuation.yield(11)
continuation.finish()
}
for await value in streamA.combineLatest(streamB, streamC) {
print(value)
}
// Prints:
// (1, 5, 10)
// (2, 6, 11)
// (3, 7, 11)
// (4, 8, 11)
// (4, 9, 11)
CurrentElementAsyncSequence
let sequence = CurrentElementAsyncSequence(0)
print(await sequence.element)
await stream.yield(1)
print(await sequence.element)
await stream.yield(2)
await stream.yield(3)
await stream.yield(4)
print(await sequence.element)
// Prints:
// 0
// 1
// 4
DebounceAsyncSequence
let stream = AsyncStream<Int> { continuation in
continuation.yield(0)
try? await Task.sleep(nanoseconds: 200_000_000)
continuation.yield(1)
try? await Task.sleep(nanoseconds: 200_000_000)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.yield(5)
continuation.finish()
}
for element in try await self.stream.debounce(for: 0.1) {
print(element)
}
// Prints:
// 0
// 1
// 5
Fail
let stream = Fail<Int, TestError>(error: TestError())
do {
for try await value in stream {
print(value)
}
} catch {
print("Error!")
}
// Prints:
// Error!
Just
let stream = Just(1)
for await value in stream {
print(value)
}
// Prints:
// 1
MergeAsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.merge(with: streamB) {
print(value)
}
// Prints:
// 1
// 5
// 2
// 6
// 3
// 7
// 4
// 8
// 9
Merge3AsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(2)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(3)
continuation.finish()
}
for await value in self.streamA.merge(with: self.streamB, self.streamC) {
print(value)
}
// Prints:
// 1
// 2
// 3
// 4
PassthroughAsyncSequence
let sequence = PassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish()
for await value in sequence {
print(value)
}
// Prints:
// 0
// 1
// 2
RemoveDuplicatesAsyncSequence
let stream = .init { continuation in
continuation.yield(1)
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
for await value in stream.removeDuplicates() {
print(value)
}
// Prints:
// 1
// 2
// 3
ReplaceErrorAsyncSequence
let sequence = Fail<Int, TestError>(
error: TestError()
)
.replaceError(with: 0)
for await value in stream {
print(value)
}
// Prints:
// 0
SharedAsyncSequence
let values = [
"a",
"ab",
"abc",
"abcd"
]
let stream = AsyncStream { continuation in
for value in values {
continuation.yield(value)
}
continuation.finish()
}
.shared()
Task {
let values = try await self.stream.collect()
// ...
}
Task.detached {
let values = try await self.stream.collect()
// ...
}
let values = try await self.stream.collect()
// ...
ThrottleAsyncSequence
let stream = AsyncStream<Int> { continuation in
continuation.yield(0)
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(1)
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.yield(5)
continuation.finish()
}
for element in try await self.stream.throttle(for: 0.05, latest: true) {
print(element)
}
// Prints:
// 0
// 1
// 2
// 5
ThrowingPassthroughAsyncSequence
let sequence = ThrowingPassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish(throwing: TestError())
do {
for try await value in sequence {
print(value)
}
} catch {
print("Error!")
}
// Prints:
// 0
// 1
// 2
// Error!
ZipAsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.finish()
}
for await value in streamA.zip(streamB) {
print(value)
}
// Prints:
// (1, 5)
// (2, 6)
Zip3AsyncSequence
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.zip(streamB, streamC) {
print(value)
}
// Prints:
// (1, 5, 8)
// (2, 6, 9)
AsyncStream
AsyncStream.Continuation
AsyncThrowingStream
AsyncThrowingStream.Continuation
License
Whatevs.