--- title: Transformation of sequences --- #Transformation of sequences {#TransformationOfSequences} The values from the sequences we consume are not always in the format we need. Sometimes there is too much noise in the data so we strip the values down. Sometimes each value needs to be expanded either into a richer object or into more values. By composing operators, Rx allows you to control the quality as well as the quantity of values in the observable sequences you consume. Up until now, we have looked at creation of sequences, transition into sequences, and, the reduction of sequences by filtering, aggregating or folding. In this chapter we will look at _transforming_ sequences. This allows us to introduce our third category of functional methods, _bind_. A bind function in Rx will take a sequence and apply some set of transformations on each element to produce a new sequence. To review:

*Ana(morphism) T --> IObservable*

*Cata(morphism) IObservable --> T*

*Bind IObservable --> IObservable*

Now that we have been introduced to all three of our higher order functions, you may find that you already know them. Bind and Cata(morphism) were made famous by [MapReduce](http://en.wikipedia.org/wiki/MapReduce) framework from Google. Here Google refer to Bind and Cata by their perhaps more common aliases; Map and Reduce. It may help to remember our terms as the `ABCs` of higher order functions.

*A*na enters the sequence. T --> IObservable

*B*ind modifies the sequence. IObservable --> IObservable

*C*ata leaves the sequence. IObservable --> T

##Select {#Select} The classic transformation method is `Select`. It allows you provide a function that takes a value of `TSource` and return a value of `TResult`. The signature for `Select` is nice and simple and suggests that its most common usage is to transform from one type to another type, i.e. `IObservable` to `IObservable`. IObservable Select( this IObservable source, Func selector) Note that there is no restriction that prevents `TSource` and `TResult` being the same thing. So for our first example, we will take a sequence of integers and transform each value by adding 3, resulting in another sequence of integers. var source = Observable.Range(0, 5); source.Select(i=>i+3) .Dump("+3") Output:
+3-->3
+3-->4
+3-->5
+3-->6
+3-->7
+3 completed
While this can be useful, more common use is to transform values from one type to another. In this example we transform integer values to characters. Observable.Range(1, 5); .Select(i =>(char)(i + 64)) .Dump("char"); Output:
char-->A
char-->B
char-->C
char-->D
char-->E
char completed
If we really want to take advantage of LINQ we could transform our sequence of integers to a sequence of anonymous types. Observable.Range(1, 5) .Select( i => new { Number = i, Character = (char)(i + 64) }) .Dump("anon"); Output:
anon-->{ Number = 1, Character = A }
anon-->{ Number = 2, Character = B }
anon-->{ Number = 3, Character = C }
anon-->{ Number = 4, Character = D }
anon-->{ Number = 5, Character = E }
anon completed
To further leverage LINQ we could write the above query using [query comprehension syntax](http://www.albahari.com/nutshell/linqsyntax.aspx). var query = from i in Observable.Range(1, 5) select new {Number = i, Character = (char) (i + 64)}; query.Dump("anon"); In Rx, `Select` has another overload. The second overload provides two values to the `selector` function. The additional argument is the element's index in the sequence. Use this method if the index of the element in the sequence is important to your selector function. ##Cast and OfType {#CastAndOfType} If you were to get a sequence of objects i.e. `IObservable`, you may find it less than useful. There is a method specifically for `IObservable` that will cast each element to a given type, and logically it is called `Cast()`. var objects = new Subject(); objects.Cast().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext(3); objects.OnCompleted(); Output:
cast-->1
cast-->2
cast-->3
cast completed
If however we were to add a value that could not be cast into the sequence then we get errors. var objects = new Subject(); objects.Cast().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Fail Output:
cast-->1
cast-->2
cast failed -->Specified cast is not valid.
Thankfully, if this is not what we want, we could use the alternative extension method `OfType()`. var objects = new Subject(); objects.OfType().Dump("OfType"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Ignored objects.OnNext(4); objects.OnCompleted(); Output:
OfType-->1
OfType-->2
OfType-->4
OfType completed
It is fair to say that while these are convenient methods to have, we could have created them with the operators we already know about. //source.Cast(); is equivalent to source.Select(i=>(int)i); //source.OfType(); source.Where(i=>i is int).Select(i=>(int)i); ##Timestamp and TimeInterval {#TimeStampAndTimeInterval} As observable sequences are asynchronous it can be convenient to know timings for when elements are received. The `Timestamp` extension method is a handy convenience method that wraps elements of a sequence in a light weight `Timestamped` structure. The `Timestamped` type is a struct that exposes the value of the element it wraps, and the timestamp it was created with as a `DateTimeOffset`. In this example we create a sequence of three values, one second apart, and then transform it to a time stamped sequence. The handy implementation of `ToString()` on `Timestamped` gives us a readable output. Observable.Interval(TimeSpan.FromSeconds(1)) .Take(3) .Timestamp() .Dump("TimeStamp"); Output
TimeStamp-->0@01/01/2012 12:00:01 a.m. +00:00
TimeStamp-->1@01/01/2012 12:00:02 a.m. +00:00
TimeStamp-->2@01/01/2012 12:00:03 a.m. +00:00
TimeStamp completed
We can see that the values 0, 1 & 2 were each produced one second apart. An alternative to getting an absolute timestamp is to just get the interval since the last element. The `TimeInterval` extension method provides this. As per the `Timestamp` method, elements are wrapped in a light weight structure. This time the structure is the `TimeInterval` type. Observable.Interval(TimeSpan.FromSeconds(1)) .Take(3) .TimeInterval() .Dump("TimeInterval"); Output:
TimeInterval-->0@00:00:01.0180000
TimeInterval-->1@00:00:01.0010000
TimeInterval-->2@00:00:00.9980000
TimeInterval completed
As you can see from the output, the timings are not exactly one second but are pretty close. ##Materialize and Dematerialize {#MaterializeAndDematerialize} The `Timestamp` and `TimeInterval` transform operators can prove useful for logging and debugging sequences, so too can the `Materialize` operator. `Materialize` transitions a sequence into a metadata representation of the sequence, taking an `IObservable` to an `IObservable>`. The `Notification` type provides meta data for the events of the sequence. If we materialize a sequence, we can see the wrapped values being returned. Observable.Range(1, 3) .Materialize() .Dump("Materialize"); Output:
Materialize-->OnNext(1)
Materialize-->OnNext(2)
Materialize-->OnNext(3)
Materialize-->OnCompleted()
Materialize completed
Note that when the source sequence completes, the materialized sequence produces an 'OnCompleted' notification value and then completes. `Notification` is an abstract class with three implementations: * OnNextNotification * OnErrorNotification * OnCompletedNotification `Notification` exposes four public properties to help you discover it: `Kind`, `HasValue`, `Value` and `Exception`. Obviously only `OnNextNotification` will return true for `HasValue` and have a useful implementation of `Value`. It should also be obvious that `OnErrorNotification` is the only implementation that will have a value for `Exception`. The `Kind` property returns an `enum` which should allow you to know which methods are appropriate to use. public enum NotificationKind { OnNext, OnError, OnCompleted, } In this next example we produce a faulted sequence. Note that the final value of the materialized sequence is an `OnErrorNotification`. Also that the materialized sequence does not error, it completes successfully. var source = new Subject(); source.Materialize() .Dump("Materialize"); source.OnNext(1); source.OnNext(2); source.OnNext(3); source.OnError(new Exception("Fail?")); Output:
Materialize-->OnNext(1)
Materialize-->OnNext(2)
Materialize-->OnNext(3)
Materialize-->OnError(System.Exception)
Materialize completed
Materializing a sequence can be very handy for performing analysis or logging of a sequence. You can unwrap a materialized sequence by applying the `Dematerialize` extension method. The `Dematerialize` will only work on `IObservable>`. ##SelectMany {#SelectMany} Of the transformation operators above, we can see that `Select` is the most useful. It allows very broad flexibility in its transformation output and can even be used to reproduce some of the other transformation operators. The `SelectMany` operator however is even more powerful. In LINQ and therefore Rx, the _bind_ method is `SelectMany`. Most other transformation operators can be built with `SelectMany`. Considering this, it is a shame to think that `SelectMany` may be one of the most misunderstood methods in LINQ. In my personal discovery of Rx, I struggled to grasp the `SelectMany` extension method. One of my colleagues helped me understand `SelectMany` better by suggesting I think of it as from one, select many. An even better definition is From one, select zero or more. If we look at the signature for `SelectMany` we see that it takes a source sequence and a function as its parameters. IObservable SelectMany( this IObservable source, Func> selector) The `selector` parameter is a function that takes a single value of `T` and returns a sequence. Note that the sequence the `selector` returns does not have to be of the same type as the `source`. Finally, the `SelectMany` return type is the same as the `selector` return type. This method is very important to understand if you wish to work with Rx effectively, so let's step through this slowly. It is also important to note its subtle differences to `IEnumerable`'s `SelectMany` operator, which we will look at soon. Our first example will take a sequence with the single value '3' in it. The selector function we provide will produce a further sequence of numbers. This result sequence will be a range of numbers from 1 to the value provided i.e. 3. So we take the sequence [3] and return the sequence [1,2,3] from our `selector` function. Observable.Return(3) .SelectMany(i => Observable.Range(1, i)) .Dump("SelectMany"); Output:
SelectMany-->1
SelectMany-->2
SelectMany-->3
SelectMany completed
If we modify our source to be a sequence of [1,2,3] like this... Observable.Range(1,3) .SelectMany(i => Observable.Range(1, i)) .Dump("SelectMany"); ...we will now get an output with the result of each sequence ([1], [1,2] and [1,2,3]) flattened to produce [1,1,2,1,2,3].
SelectMany-->1
SelectMany-->1
SelectMany-->2
SelectMany-->1
SelectMany-->2
SelectMany-->3
SelectMany completed
This last example better illustrates how `SelectMany` can take a `single` value and expand it to many values. When we then apply this to a `sequence` of values, the result is each of the child sequences combined to produce the final sequence. In both examples, we have returned a sequence that is the same type as the source. This is not a restriction however, so in this next example we return a different type. We will reuse the `Select` example of transforming an integer to an ASCII character. To do this, the `selector` function just returns a char sequence with a single value. Func letter = i => (char)(i + 64); Observable.Return(1) .SelectMany(i => Observable.Return(letter(i))); .Dump("SelectMany"); So with the input of [1] we return a sequence of [A].
SelectMany-->A
SelectMany completed
Extending the source sequence to have many values, will give us a result with many values. Func letter = i => (char)(i + 64); Observable.Range(1,3) .SelectMany(i => Observable.Return(letter(i))) .Dump("SelectMany"); Now the input of [1,2,3] produces [[A], [B], [C]] which is flattened to just [A,B,C].
SelectMany-->A
SelectMany-->B
SelectMany-->C
Note that we have effectively recreated the `Select` operator. The last example maps a number to a letter. As there are only 26 letters, it would be nice to ignore values greater than 26. This is easy to do. While we must return a sequence for each element of the source, there aren't any rules that prevent it from being an empty sequence. In this case if the element value is a number outside of the range 1-26 we return an empty sequence. Func letter = i => (char)(i + 64); Observable.Range(1, 30) .SelectMany( i => { if (0 < i && i < 27) { return Observable.Return(letter(i)); } else { return Observable.Empty(); } }) .Dump("SelectMany"); Output:
A
B
C
...
X
Y
Z
Completed
To be clear, for the source sequence [1..30], the value 1 produced a sequence [A], the value 2 produced a sequence [B] and so on until value 26 produced a sequence [Z]. When the source produced value 27, the `selector` function returned the empty sequence []. Values 28, 29 and 30 also produced empty sequences. Once all the sequences from the calls to the selector had been fattened to produce the final result, we end up with the sequence [A..Z]. Now that we have covered the third of our three higher order functions, let us take time to reflect on some of the methods we have already learnt. First we can consider the `Where` extension method. We first looked at this method in the chapter on [Reducing a sequence](05_Filtering.html#Where). While this method does reduce a sequence, it is not a fit for a functional _fold_ as the result is still a sequence. Taking this into account, we find that `Where` is actually a fit for _bind_. As an exercise, try to write your own extension method version of `Where` using the `SelectMany` operator. Review the last example for some help...
An example of a `Where` extension method written using `SelectMany`: public static IObservable Where(this IObservable source, Func predicate) { return source.SelectMany( item => { if (predicate(item)) { return Observable.Return(item); } else { return Observable.Empty(); } }); } Now that we know we can use `SelectMany` to produce `Where`, it should be a natural progression for you the reader to be able to extend this to reproduce other filters like `Skip` and `Take`. As another exercise, try to write your own version of the `Select` extension method using `SelectMany`. Refer to our example where we use `SelectMany` to convert `int` values into `char` values if you need some help...
An example of a `Select` extension method written using `SelectMany`: public static IObservable MySelect( this IObservable source, Func selector) { return source.SelectMany(value => Observable.Return(selector(value))); } ###IEnumerable vs. IObservable SelectMany {#IEnumerableVsIObservableSelectMany} It is worth noting the difference between the implementations of `IEnumerable` `SelectMany` and `IObservable` `SelectMany`. Consider that `IEnumerable` sequences are pull based and blocking. This means that when an `IEnumerable` is processed with a `SelectMany` it will pass one item at a time to the `selector` function and wait until it has processed all of the values from the `selector` before requesting (pulling) the next value from the source. Consider an `IEnumerable` source sequence of [1,2,3]. If we process that with a `SelectMany` operator that returns a sequence of [x*10, (x*10)+1, (x*10)+2], we would get the [[10,11,12], [20,21,22], [30,31,32]]. private IEnumerable GetSubValues(int offset) { yield return offset * 10; yield return (offset * 10) + 1; yield return (offset * 10) + 2; } We then apply the `GetSubValues` method with the following code: var enumerableSource = new [] {1, 2, 3}; var enumerableResult = enumerableSource.SelectMany(GetSubValues); foreach (var value in enumerableResult) { Console.WriteLine(value); } The resulting child sequences are flattened into [10,11,12,20,21,22,30,31,32].
10
11
12
20
21
22
30
31
32
The difference with `IObservable` sequences is that the call to the `SelectMany`'s `selector` function is not blocking and the result sequence can produce values over time. This means that subsequent 'child' sequences can overlap. Let us consider again a sequence of [1,2,3], but this time values are produced three second apart. The `selector` function will also produce sequence of [x*10, (x*10)+1, (x*10)+2] as per the example above, however these values will be four seconds apart. To visualize this kind of asynchronous data we need to represent space and time. ###Visualizing sequences {#VisualizingSequences} Let's divert quickly and talk about a technique we will use to help communicate the concepts relating to sequences. Marble diagrams are a way of visualizing sequences. Marble diagrams are great for sharing Rx concepts and describing composition of sequences. When using marble diagrams there are only a few things you need to know 1. a sequence is represented by a horizontal line 2. time moves to the right (i.e. things on the left happened before things on the right) 3. notifications are represented by symbols: *. '0' for OnNext *. 'X' for an OnError *. '|' for OnCompleted 4. many concurrent sequences can be visualized by creating rows of sequences This is a sample of a sequence of three values that completes:
--0--0--0-|
This is a sample of a sequence of four values then an error:
--0--0--0--0--X
Now going back to our `SelectMany` example, we can visualize our input sequence by using values in instead of the 0 marker. This is the marble diagram representation of the sequence [1,2,3] spaced three seconds apart (note each character represents one second).
--1--2--3|
Now we can leverage the power of marble diagrams by introducing the concept of time and space. Here we see the visualization of the sequence produced by the first value 1 which gives us the sequence [10,11,12]. These values were spaced four seconds apart, but the initial value is produce immediately.
1---1---1|
0   1   2|
As the values are double digit they cover two rows, so the value of 10 is not confused with the value 1 immediately followed by the value 0. We add a row for each sequence produced by the `selector` function.
--1--2--3|
 
  1---1---1|
  0   1   2|
 
     2---2---2|
     0   1   2|

	
        3---3---3|
        0   1   2|
Now that we can visualize the source sequence and its child sequences, we should be able to deduce the expected output of the `SelectMany` operator. To create a result row for our marble diagram, we simple allow the values from each child sequence to 'fall' into the new result row.
--1--2--3|
 
  1---1---1|
  0   1   2|
 
     2---2---2|
     0   1   2|

	
        3---3---3|
        0   1   2|

	
--1--21-321-32--3|
  0  01 012 12  2|

If we take this exercise and now apply it to code, we can validate our marble diagram. First our method that will produce our child sequences: private IObservable GetSubValues(long offset) { //Produce values [x*10, (x*10)+1, (x*10)+2] 4 seconds apart, but starting immediately. return Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(4)) .Select(x => (offset*10) + x) .Take(3); } This is the code that takes the source sequence to produce our final output: // Values [1,2,3] 3 seconds apart. Observable.Interval(TimeSpan.FromSeconds(3)) .Select(i => i + 1) //Values start at 0, so add 1. .Take(3) //We only want 3 values .SelectMany(GetSubValues) //project into child sequences .Dump("SelectMany"); The output produced matches our expectations from the marble diagram.
SelectMany-->10
SelectMany-->20
SelectMany-->11
SelectMany-->30
SelectMany-->21
SelectMany-->12
SelectMany-->31
SelectMany-->22
SelectMany-->32
SelectMany completed
We have previously looked at the `Select` operator when it is used in Query Comprehension Syntax, so it is worth noting how you use the `SelectMany` operator. The `Select` extension method maps quite obviously to query comprehension syntax, `SelectMany` is not so obvious. As we saw in the earlier example, the simple implementation of just suing select is as follows: var query = from i in Observable.Range(1, 5) select i; If we wanted to add a simple `where` clause we can do so like this: var query = from i in Observable.Range(1, 5) where i%2==0 select i; To add a `SelectMany` to the query, we actually add an extra `from` clause. var query = from i in Observable.Range(1, 5) where i%2==0 from j in GetSubValues(i) select j; //Equivalent to var query = Observable.Range(1, 5) .Where(i=>i%2==0) .SelectMany(GetSubValues); An advantage of using the query comprehension syntax is that you can easily access other variables in the scope of the query. In this example we select into an anon type both the value from the source and the child value. var query = from i in Observable.Range(1, 5) where i%2==0 from j in GetSubValues(i) select new {i, j}; query.Dump("SelectMany"); Output
SelectMany-->{ i = 2, j = 20 }
SelectMany-->{ i = 4, j = 40 }
SelectMany-->{ i = 2, j = 21 }
SelectMany-->{ i = 4, j = 41 }
SelectMany-->{ i = 2, j = 22 }
SelectMany-->{ i = 4, j = 42 }
SelectMany completed
--- This brings us to a close on Part 2. The key takeaways from this were to allow you the reader to understand a key principal to Rx: functional composition. As we move through Part 2, examples became progressively more complex. We were leveraging the power of LINQ to chain extension methods together to compose complex queries. We didn't try to tackle all of the operators at once, we approached them in groups. * Creation * Reduction * Inspection * Aggregation * Transformation On deeper analysis of the operators we find that most of the operators are actually specialization of the higher order functional concepts. We named them the ABC's of functional programming:
  • Anamorphism, aka:
    • Ana
    • Unfold
    • Generate
  • Bind, aka:
    • Map
    • SelectMany
    • Projection
    • Transform
  • Catamorphism, aka:
    • Cata
    • Fold
    • Reduce
    • Accumulate
    • Inject
Now you should feel that you have a strong understanding of how a sequence can be manipulated. What we have learnt up to this point however can all largely be applied to `IEnumerable` sequences too. Rx can be much more complex than what many people will have dealt with in `IEnumerable` world, as we have seen with the `SelectMany` operator. In the next part of the book we will uncover features specific to the asynchronous nature of Rx. With the foundation we have built so far we should be able to tackle the far more challenging and interesting features of Rx. ---

Additional recommended reading