Coroutines for Ruby

A library for creating and composing producer/transformer/consumer coroutines. Producers are already provided by Ruby's built-in Enumerator class; this library provides Transformer and Consumer classes that work analogously. In particular, they are also based on Fiber and not on threads (as in some other producer/consumer libraries). Also provides a module Sink, which is analogous to Enumerable, and Enumerable/Transformer/Sink composition.

rdoc API docs

nome.github.io/coroutines

source code

github.com/nome/coroutines

bug tracker

github.com/nome/coroutines/issues

Installing

gem install coroutines

Using


Major change in version 0.2.0: The preferred way of connecting enumerables, transformers and sinks is now to use Enumerable#out_connect, Sink#in_connect, Transformer#out_connect, Transformer#in_connect. I currently tend to the conclusion that descriptive names are more idiomatic Ruby (while the operator notation is rather a bit Haskellish). The <= and >= operators are still provided by default for backwards-compatibility, and for comparing the relative benefits of both notations. However, please consider their usage without explicitly requiring 'coroutines/operators' as deprecated. Feedback welcome.


A simple consumer:

require 'coroutines'

def counter(start)
  result = start
  loop { result += await }
  "Final value: #{result}"
end  

co = consum_for :counter, 10  # => #<Consumer: main:counter (running)>
co << 10 << 1000 << 10000
co.close  # => "Final value: 11020"

The call to Consumer#close raises StopIteration at the point at which the consumer last executed await. In this case, the StopIteration is caught by loop, causing it to terminate.

Note that this is an intentionally simplistic example intended to show the basic library API. Of course, a counter could just as easily be implemented using a closure; the advantage of a consumer is that the implementation could involve arbitrary control structures with multiple calls to await.

Often, a consumer can be rewritten using an enumerator. In the above example, we could also write

def counter(start, values)
  result = start
  values.each { |x| result += x }
  "Final value: #{result}"
end

values = [10, 1000, 100000] # could be an enumerator fetching values lazily
counter(10, values)

Depending on the context, either solution may be more readable. But there's one thing you can do with consumers but not with enumerators: Lazily feeding the same values to more than one function. With enumerators, iteration is driven from the consuming side; with consumers, it is driven by the producing side. Distributing a stream to many consumers (or, more generally, sinks) can be done using Sink::Multicast. See the Apache log parsing example for a practical application of multicasting.

A simple transformer:

require 'coroutines'

def running_sum(start)
  result = start
  loop { result += await; yield result }
end  

tr = trans_for :running_sum, 3  # => #<Transformer: main:running_sum>
sums = tr.in_connect(1..10)  # => #<Enumerator::Lazy: #<Transformer: main:running_sum> <= 1..10>
sums.to_a  # => [4, 6, 9, 13, 18, 24, 31, 39, 48, 58]

tr = trans_for :running_sum, 0  # => #<Transformer: main:running_sum>
collect_sums = tr.out_connect([])  # => #<Consumer: #<Transformer: main:running_sum> >= []>
collect_sums << 1 << 1 << 2 << 3 << 5
collect_sums.close  # => [1, 2, 4, 7, 12]

Again, this is just a basic demonstration of the API that could be written without resorting to coroutines (though probably not quite as succinctly).

Sources and sinks

As you probably know, many Ruby classes use the Enumerable mixin to provide common functionality like mapping and filtering of sequences of values. We can think of such classes as “sources” of the values yielded by their respective each methods. In the same way, several classes use the << operator to accept sequences of values:

$stdout << "a" << "b" << "c"  # prints "abc"
[] << "a" << "b" << "c"  # => ["a", "b", "c"]
"" << "a" << "b" << "c"  # => "abc"

The coroutines library provides the mixin Sink for such classes. Among other methods, this provides Sink#in_connect, which “connects” an enumerable (as a “source” of values) to a sink's “input”. Upon connecting, the enumerable is iterated and each value is appended to the sink; then the sink is closed (by calling its close method). Sink#in_connect returns whatever close returns, which defaults to simply returning the sink itself (see Sink#close).

open("test.txt", "w").in_connect(["a", "b", "c"])  # write "abc" to test.txt
["a", "b", "c"].in_connect("d".."f")  # => ["a", "b", "c", "d", "e", "f"]
"abc".in_connect("d".."f")  # => "abcdef"

Note that for IO/File objects, this implies that the file descriptor will be closed after in_connect finishes. If this is not what you want, use dup:

$stdout.dup.in_connect(["a", "b", "c"])  # print "abc"

For symmetry, the coroutines library also implements Enumerable#out_connect, which mirrors Sink#in_connect:

("d".."f").out_connect("abc")  # => "abcdef"

Pipelines involving transformers

We'll be re-using the running_sum example from above.

(1..10).
  out_connect(trans_for :running_sum, 0).
  lazy.map{|x| x.to_s + "\n"}.
  out_connect($stdout.dup)

What does this do? It takes the sequences of integers from 1 to 10, then computes the running sum, then converts each partial sum to a string, and finally prints out each string to $stdout. Except that the “thens” in the previous sentence are not entirely correct, since the processing stages run in parallel (using coroutines where required, so blocking IO in one stage will block all other stages). Instead of (1..10), we could have a File and iterate over GBs of data, and at no point would we need to have the entire sequence in memory.

In the above example, the “lazyness” of the pipeline - that is, the fact that we don't have to keep the complete sequence of values in memory at any stage - requires Enumerable#lazy. If we replace the lazy.map with a simple map, the complete sequence of strings will be stored in an intermediate Array.

[ In order to avoid confusing readers it should be noted that lazy enumerators were added in Ruby 2.0; the coroutines gem therefore depends on Charles Oliver Nutter's lazy_enumerator gem, which contains a backport of the feature to Ruby 1.8/1.9 ]

Any part of a pipeline can be passed around and stored as an individual object:

(1..10).
  out_connect(trans_for :running_sum, 0).
  lazy.map{|x| x.to_s + "\n"}
# => an Enumerator

trans_for(:running_sum, 0).
  lazy.map{|x| x.to_s + "\n"}.
  out_connect($stdout.dup)
# => a Consumer

trans_for(:running_sum, 0).
  lazy.map{|x| x.to_s + "\n"}
# => a Transformer

Note however that, while the last example works for map and some other common Enumerable methods, not all of the Enumerable API is implemented yet.

Connect operators

As explained above, the overloaded <= and >= operators are currently provided by default; but using them without explicitly requiring 'coroutines/operators' is deprecated.

'coroutines/operators' provides a short-hand notation for in_connect, out_connect and Enumerable#filter_map by overloading >= and <=:

open("test.txt", "w") <= ["a", "b", "c"]  # write "abc" to test.txt
["a", "b", "c"] <= ("d".."f")  # => ["a", "b", "c", "d", "e", "f"]
"abc" <= ("d".."f")  # => "abcdef"
enum = (1..10) >= trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" }  # => an Enumerator
cons = trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" } >= $stdout.dup  # => a Consumer
trans = trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" }  # => a Transformer

where a Proc object in a pipeline is interpreted as if it were an argument to Enumerable#filter_map; i.e. the following to are equivalent:

(1..9) >= proc{|x| x.to_s if x.even? } >= ""
(1..9).lazy.filter_map{|x| x.to_s if x.even? }.out_connect("")

Apart from saving a few keystrokes (d'oh…), this has a the advantage that all elements of a pipeline are lazy by_default. When using map, filter and friends, forgetting to drop a “lazy” in the right place causes this part of the pipeline to become strict (but of course it may still produce the intended results!). This type of bug can be hard to catch - unless you're always testing with production-sized data sets.

Links and Acknowledgements

Inspired by David M. Beazley’s Generator Tricks (Python) and Michael Snoyman’s conduit package (Haskell).

Compatibility

Tested with MRI 1.9.3p484, 2.0.0p384 and 2.2.0dev (trunk 47827) on Linux. Should work on other moderately recent versions and other operating systems. Will probably not work on all alternative Ruby implementations, because it depends on the fiber library.

Requiring 'coroutines' does some monkey patching to various core classes, which may be a compatibility issue. It is possible to load just the core module ( Sink) and classes ( Consumer and Transformer) using “require 'coroutines/base'”. Obviously, you will then have to instantiate Consumer and Transformer manually in order to use them; and you'll have to be more explicit when constructing certain pipelines.

Patched core modules and classes are:

Enumerable

add Enumerable#filter_map, Enumerable#out_connect and Enumerable#>= operator

Enumerator::Lazy

add Enumerator::Lazy#filter_map

IO, Array, String

include Sink mixin

Hash

define Hash#<< operator and include Sink mixin

Method

define Method#<< operator, define Method#close as an alias for Method#receiver, and include Sink mixin

Object

define Object#await, Object#consum_for and Object#trans_for

Proc

define Proc#to_trans, Proc#<= and Proc#>=

Symbol

define Symbol#to_trans, Symbol#<= and Symbol#>=

Contributing

  1. Fork it on Github

  2. Create your feature branch (git checkout -b my-new-feature)

  3. Commit your changes (git commit -am 'Add some feature')

  4. Make sure unit tests pass (gem install --development coroutines; ruby tests/suite.rb)

  5. Push to the branch (git push origin my-new-feature)

  6. Create new Pull Request