Apple Combine Basics PART I

Reactive programming is not something new but recently it gained much higher interest. The reason is simple: traditional imperative programming has some limitations when it comes to coping with the demands of today, where applications need to have high availability and provide low response times also during high load, that’s why we decided to take a look at Combine, apples new Reactive Programming framework.

And if you feel like this is the exact solution for your problems, contact us at Evolvice  and we will gladly explore the possibilities together with you.

Excited? Let’s dive deep into the world of reactive programming!

Imperative vs. Reactive programming

Reactive programming is a declarative programming paradigm. It is based on data streams and change propagation. In other words, you have some source of events (a stream), you observe that source and react when it emits a value or event. The source of events (e.g. event stream) can be any variable that change over time, any external events, like mouse clicks events, I/O events, http requests, etc.

Let’s take a look at an example:

a = b + c

In imperative programming (like Object Oriented programming) a depend on b and c in the moment of calculation of the expression. So, the machine will calculate b + c, assign the result to a. If afterwords b or c change, a will be not affected anymore. In reactive programming a depend on b and c until you break that dependency intentionally. It means, whenever b or c changes, a will react and change also (in our example it means a will be recalculated).

A bit of history…

The first ideas of reactive programming appeared in 1970s, but the first big splash was made with Microsoft’s introduction of Reactive Extension (Rx) for .NET. Rx tried to solve the problem of asynchronous data access from event based UI. The standard iterator pattern, where the client asks for data, is not applicable for asynchronous data access. To solve this problem, Rx changed the control flow using Observer pattern. So, the client, instead of requesting the data following the Iterator pattern simply waits for the data to be sent to the client, using the Observer pattern.

After Rx was introduced and successfully used for some domain problems solving as well as for binding the ViewModel to UI, reactive frameworks appeared for other programming language. For Objective-C there are some frameworks, like Bond and ReaciveCocoa, ReactiveObjc. For Swift probably SwiftRx is the leader of all reactive frameworks.

In 2019 Apple presented its own reactive framework – Combine. “Combine is a unified declarative framework for processing values over time.” – Apple sais. Even though they don’t define Combine as an RX framework, it still is.

Introduction to Combine

The heart of the Combine framework is the Observer pattern. Let’s recall how it works. The subject (also called Observable) can change its state and notify Observers about the change.

The Observable analog in Combine is called the Publisher, and the Observer – Subscriber. So, the subscriber subscribes to the Publisher. The publisher can emit some events/values, and the receiver receives those events/values and react to it.

Publishers are represented by a generic protocol Publisher

public protocol Publisher {
    associatedtype Output
    associatedtype Failure : Error

    func receive(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

A subscriber is represented by a generic protocol Subscriber

public protocol Subscriber : CustomCombineIdentifierConvertible {
   associatedtype Input
   associatedtype Failure : Error

   func receive(subscription: Subscription)
   func receive(_ input: Self.Input) -> Subscribers.Demand
   func receive(completion: Subscribers.Completion)
}

A publisher has only an Output (because it can only emit values), and a subscriber has only an Input (it can just receive events / values). Publisher also can emit a Failure value, as well as the subscriber, can receive it and react to it. We will talk about it a little bit more in the error handling section.

In order to connect a subscriber to a publisher they should have the same Input and Output interface, e.g. Publisher.Output and Publisher.Failure should be the same type as Subscriber.Input and Subscriber.Failure. This is described in the publisher protocol. There are some publishers and subscribers already implemented in Combine (you can also implement your own using the two protocols).

Let’s play with them in the Playground (do not forget to import Combine). We will make a very simple system made of a subscriber connected to a publisher. Call such a system “pipeline”.

import Cocoa
import Combine

let publisher = PassthroughSubject<Int, Never>()
publisher.sink { (value: Int) in
    print("received value (value)")
}

publisher.send(1)
publisher.send(5)

The output is:

received value 1

received value 5

PassthroughtSubject is a special kind of publishers, called the Subject.

PassthroughSubject<Int, Never>  means we created a publisher which can publish Int data, and no error (with the Never keyword).

sink – is one of the already implemented subscribers.

But what if we also want to handle errors? Lets first define some error enum

enum PublisherError: String, Error
 {
    case error1 = "error type 1"
    case error2 = "error type 2"
}

and change our publisher and subscriber.

let publisher = PassthroughSubject<Int, PublisherError>()
publisher.sink(receiveCompletion: { (completion: Subscribers.Completion) in
    
    switch completion {
    case .failure(let error):
        print("error: (error)")
    case .finished:
        print("Finished")
    }
    
}, receiveValue: { (value: Int) -> Void in
    print("received value (value)")
})

Because now the publisher is able to send errors, we need to define the receiveCompletion closure in the receiver. The completion argument is just an enum with 2 possible values: failure and success

When we run the playground, the output is

received value 1
received value 5
error: error2

Closing a Pipeline

As we see from the example above, after we created a pipeline we are able to send multiply events over it. So, the pipeline is not closed/destroyed after the first event arrived.

We can close the pipeline by sending Completion.finished or Completion.failure from the publisher. Study the next examples:

publisher.send(1)
publisher.send(5)
publisher.send(completion: .finished)
publisher.send(8)

Output

received value 1
received value 5
Finished

or with .failure

publisher.send(1)
publisher.send(5)
publisher.send(completion: .failure(.error1))
publisher.send(8)

Output

received value 1
received value 5
error: error1

After we send a completion, the pipe is closed, so, publisher.send(8) has no effect. Even if we try to subscribe to the closed publisher, it will not work again.

But what if we just want to cancel a subscription, and probably subscribe the publisher again some time later?

When we subscribe a publisher with sink, we get an instance of AnyCancellable which has a cancel() method for canceling a subscription.

let cancel = publisher.sink(receiveCompletion: { (completion: Subscribers.Completion) in
    
    switch completion {
    case .failure(let error):
        print("error: (error)")
    case .finished:
        print("Finished")
    }
    
}, receiveValue: { (value: Int) -> Void in
    print("received value (value)")
})

publisher.send(1)
publisher.send(5)
cancel.cancel()
publisher.send(8)

publisher.sink(receiveCompletion: {
    print($0)
}) {
    print("value ($0)")
}

publisher.send(10)

Output

received value 1
received value 5
value 10

Operators

Sometimes the emitted value from the Publisher needs to be processed before it gets to the receiver. We can accomplish it by Operators. An operator is a Publisher (because it has an Output) and a Subscriber (because it has an Input).

Let’s convert the Int value from our previous example to String before it gets to the receiver

let publisher = PassthroughSubject<Int, PublisherError>()
publisher
    .map{ (inputValue: Int) -> String in
        "(inputValue)"
    }
    .sink(receiveCompletion: { (completion: Subscribers.Completion) in
    
    switch completion {
    case .failure(let error):
        print("error: (error)")
    case .finished:
        print("Finished")
    }
    
}, receiveValue: { (value: String) -> Void in
    print("received value (value)")
})

publisher.send(1)
publisher.send(5)

Output

received value 1
received value 5

We can chain different operators. For example, let’s filter out odd numbers with filter Operator

let publisher = PassthroughSubject<Int, PublisherError>()
publisher
    .filter { (input: Int) -> Bool in
        input % 2 == 0
    }
    .map { (inputValue: Int) -> String in
        "(inputValue)"
    }
    .sink(receiveCompletion: { (completion: Subscribers.Completion) in
    
    switch completion {
    case .failure(let error):
        print("error: (error)")
    case .finished:
        print("Finished")
    }
    
}, receiveValue: { (value: String) -> Void in
    print("received value (value)")
})

publisher.send(1)
publisher.send(5)
publisher.send(6)

Output

received value 6

Combine has many operators, which can be categorized into:

  • map
  • filtering
  • reducing
  • mathematical operations on elements
  • applying matching criteria to elements
  • applying sequence operations to elements
  • selecting specific elements
  • combining elements from multiply publishers
  • handling errors
  • adapting publisher types
  • controlling timing
  • encoding and decoding
  • debugging

For more, refer to the official documentation

Error handling

Apple engineers made error handling in Combine explicit and type-safe (unlike other third party RX frameworks).

If an error can never happen, we can use Never to make it explicit (as we did in our first example).

Convert error types

Sometimes we need to connect a subscriber to a publisher, but they have different error types. For example, we have a search engine, which provides a publisher. Search engine publisher defines its own error type SearchError.  Every time the search engine gets a search request, it proceeds with it and sends the result via a publisher. Lets say, our UI code is the subscriber, which will show the search results somewhere in the window. It defines its own error type ServiceError.


import Cocoa
import Combine

enum SearchError: Error {
    case notFound
    case timeExpired
}

enum ServiceError: Error {
    case emptyResponse
    case buisy
}

let searchEnginePublisher = PassthroughSubject<Int, SearchError>()
let subscriber = Subscribers.Sink<Int, ServiceError>(receiveCompletion: { _ in }) { print("($0)") }

searchEnginePublisher.subscribe(subscriber)

If we run this code, we will get an error:

“Instance method ‘subscribe’ requires the types ‘SearchError’ and ‘ServiceError’ be equivalent”. To fix this, we need convert SearchError to ServiceError. In Combine we can do so with mapError operator, just like we used earlier the map operator to convert the publishers output type to the subscribers input type.

searchEnginePublisher
    .mapError { (error: SearchError) -> ServiceError in
        switch error {
        case .notFound:
            return .emptyResponse
        case .timeExpired:
            return .buisy
        }
    }
    .receive(subscriber: subscriber)

Let’s try it:

searchEnginePublisher.send("hello")
searchEnginePublisher.send(completion: .failure(.notFound))
searchEnginePublisher.send("world")

Output

hello

As we see, after we sent the error, the pipe terminated. So, the string “word” is not sent to the receiver.

replaceError()

In our search engine example, the error handling strategy could be to use some placeholder value instead of propagating the error. So, we need to replace an error with a placeholder value. In our case we could show some “empty result” string. Combine provides us with replaceError(with: T) operator.

searchEnginePublisher
    .replaceError(with: "empty result")
    .receive(subscriber: subscriber)

searchEnginePublisher.send("hello")
searchEnginePublisher.send(completion: .failure(.notFound))
searchEnginePublisher.send("world")

Output

hello
empty result

Please note the next:

  • we replace ANY error value with a placeholder value (e.g. we can’t distinguish between different error types).
  • because any error values will be replaced, it means that the subscriber will never receive any error. So, we changed its interface from Subscribers.Sink<String, ServiceError> to Subscribers.Sink<String, Never>.
  • Please note, an error was emitted from the Publisher, so, the pipe will be closed.

Catch

In case we want to have some individual “placeholder” values for each error, we can use the Catch(error: Error) operator.

searchEnginePublisher
    .catch{ (error: SearchError) -> Just in
        switch error {
        case .notFound:
            return Just("Not found")
        case .timeExpired:
            return Just("Engine is busy. Try again")
        }
     }
    .receive(subscriber: subscriber)

searchEnginePublisher.send("hello")
searchEnginePublisher.send(completion: .failure(.timeExpired))
searchEnginePublisher.send("world")

Output

hello
Engine is busy. Try again

Please note the next:

  • Catch has an error argument, so we can replace a specific error value with some “placeholder” object (unlike replaceError, which doesn’t take any error argument).
  • because error values will be replaced, the receiver should have input error type Never.
  • the error was emitted from the Publisher, so, the pipe will be closed.

Producing errors

Sometimes it is needed to produce an error from an Operator in the pipeline. Here we go back to the initial version of our search engine example (without error replacing). If the publisher emits an empty string, it means no search result. So, instead of the empty string we expect the SearchError.notFound error. We can fix this by throwing the error in such a case. There are some operators with the “try” prefix in their names. All of those can produce an error by throwing it. We will use the tryMap operator.

searchEnginePublisher
    .tryMap { (value: String) throws -> String in
        guard !value.isEmpty else {
            throw SearchError.notFound
        }
        return value
    }
    .mapError { (error) -> SearchError in
        return error as! SearchError
    }
    .catch{ (error: SearchError) -> Just in
        switch error {
        case .notFound:
            return Just("Not found")
        case .timeExpired:
            return Just("Engine is busy. Try again")
        }
     }
    .receive(subscriber: subscriber)

searchEnginePublisher.send("hello")
searchEnginePublisher.send("")
searchEnginePublisher.send("world")

Output

hello
Not found

In the output we see only the first word “hello” and “Not found” – because we sent an empty string, which was converted to an error in the tryMap operator, and then converted to a “placeholder” string in cath operator.

Please note:

  • tryMap can throw only Error generic type. But our next operator in the pipe expect a SearchError.
  • We convert the Error (produced by the tryMap operator) to SearchError with the mapError operator.
  • When we throw an error – the pipe is terminated (despite the fact that the error was send by an Operator, not a Publisher).

Conclusion:

Congratulations! If you are still reading this blog, you’ve studied the basics of Combine, and now you are ready to write your amazing apps using this nice technology. We’ve covered all the components that compose the Combine framework: publishers, receivers, and operators. We’ve also studied error handling and the basic life cycle of the pipe. Also, we’ve seen how easy it is to set up a pipe.

In the next part, we will use our knowledge to create a complex pipe for interacting with a REST API. The interaction will be done in a few steps. Also, we will implement some error handling strategy.

Are you excited? Go to Apple Combine Intro Part II.