class Object

Constants

COLNAMES
COLTYPES
DAY
Multicast

convenience alias for Sink::Multicast (this is not defined when requiring just coroutines/base)

Public Instance Methods

await → obj click to toggle source

Evaluates to the next input value from the associated consumption context. In order to create a consumption context, you have to use #consum_for or #trans_for on the method calling await.

The behavior of await is undefined if the method using it is called without being wrapped by #consum_for or trans_for. It should be an error to do so, as it is an error to use yield without an associated block; however, since await is not a language primitive (like yield), it is not always possible to enforce this. This is likely to change in a future version if a better solution can be found.

# File lib/coroutines.rb, line 121
def await
        yielder = Fiber.current.instance_variable_get(:@yielder)
        raise CoroutineError, "you can't call a consumer" if yielder.nil?
        yielder.await
end
bytes_transferred() { |total| ... } click to toggle source
# File examples/parse_apache.rb, line 42
def bytes_transferred
        total = 0
        loop do
                total += await["bytes"]
                yield total
        end
end
consum_for(method, *args) → consumer click to toggle source

Creates a new Consumer coroutine from the given method. This is analogous to using Kernel#enum_for to create an Enumerator instance. The method is called immediately (with the given args). It executes until the first call to await, at which point #consum_for returns the Consumer instance. Calling consumer << obj resumes the consumer at the point where it last executed await, which evaluates to obj.

Calling consumer.close raises StopIteration at the point where the consumer last executed await; it is expected that it will terminate without executing await again. Consumer#close evaluates to the return value of the method.

Example:

def counter(start)
    result = start
    loop { result += await }
    "Final value: #{result}"
end

co = consum_for :counter, 10  #=> #<Consumer: main:counter (running)>
co << 10 << 1000 << 10000
co.close  #=> "Final value: 11020"
# File lib/coroutines.rb, line 156
def consum_for(meth, *args)
        cons = Consumer.new do |y|
                Fiber.current.instance_variable_set(:@yielder, y)
                send(meth, *args)
        end
        description = "#{inspect}:#{meth}"
        cons.define_singleton_method :inspect do
                state = if @fiber.alive? then "running" else @result.inspect end
                "#<Consumer: #{description} (#{state})>"
        end
        cons
end
dump(label) click to toggle source
# File examples/parse_apache.rb, line 64
def dump(label)
        loop do
                puts "#{label}: #{await}"
        end
end
find_404() { |%w{status datetime request}| ... } click to toggle source

Reads log entry Hashes; for each log entry with status 404, yields

# File examples/parse_apache.rb, line 33
def find_404
        loop do
                r = await
                if r["status"] == 404
                        yield %w{status datetime request}.map{|col| r[col].to_s}.join(" ")
                end
        end
end
follow(path) { |readline| ... } click to toggle source
# File examples/parse_apache.rb, line 70
def follow(path)
        file = open(path)
        loop do
                begin
                        yield file.readline
                rescue EOFError
                        sleep 0.1
                end
        end
        file.close
end
parse() { |log_entry| ... } click to toggle source

Reads log lines in the following format: host referrer user [timestamp] “GET|POST request protocol” status bytes … For each parsable line, yields a Hash with the keys given by COLNAMES and the values parsed according to COLTYPES.

# File examples/parse_apache.rb, line 21
def parse
        loop do
                if await =~ /(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (\S+) (\S+)" (\S+) (\S+)/
                        log_entry = $~.captures.zip(COLNAMES, COLTYPES).
                                map{|val, name, type| [name, val.send(type)] }.
                                out_connect(Hash.new)
                        yield log_entry
                end
        end
end
parse_dpkg_status() { |strip, strip| ... } click to toggle source

Transformer that accepts lines of text and

  • appends any lines starting with a space to the previous line

  • parses each set of non-empty concatenated lines as a key:value pair and yields that pair as an Array

# File examples/dpkgstatus-bench.rb, line 11
def parse_dpkg_status
        line = await
        loop do
                accum = ""
                begin
                        begin
                                accum += line
                                line = await
                        end while line.start_with? " "
                ensure
                        key, value = accum.split(":", 2)
                        yield [key.strip, value.strip] if accum != "\n"
                end
        end
end
request_rate() { |to_f| ... } click to toggle source
# File examples/parse_apache.rb, line 51
def request_rate
        last = await["datetime"]
        loop do
                n = 0
                begin
                        this = await["datetime"]
                        n += 1
                end while this == last
                yield n.to_f / (this - last).to_f / DAY
                last = this
        end
end
trans_for(method, *args) → transformer click to toggle source

Creates a new Transformer instance that wraps the given method. This is analogous to using Kernel#enum_for to create an Enumerator instance, or using #consum_for to create a Consumer instance. The method is not executed immediately. The resulting Transformer can be connected to an enumerable (using transformer.in_connect(enum)) or to a sink (using transformer.out_connect(sink)). The point at which the transformer method gets started depends on how it is connected; in any case however, the method will be called with the args given to trans_for. See Transformer for details.

Within the transformer method, await can be used to read the next input value (as in a Consumer, compare #consum_for), and yield can be used to produce an output value (i.e., when it is called, the method is given a block which accepts its output).

Example:

def running_sum(start)
  result = start
  loop { result += await; yield result }
end

tr = trans_for :running_sum, 3  #=> #<Transformer: main:running_sum>
sums = (1..10).out_connect(tr)  #=> #<Enumerator: #<Transformer: main:running_sum> <= 1..10>
sums.to_a  #=> [4, 6, 9, 13, 18, 24, 31, 39, 48, 58]
# File lib/coroutines.rb, line 198
def trans_for(meth, *args)
        trans = Transformer.new do |y|
                Fiber.current.instance_variable_set(:@yielder, y)
                send(meth, *args, &y.method(:yield))
        end
        description ="#{inspect}:#{meth}"
        trans.define_singleton_method :inspect do
                "#<Transformer: #{description}>"
        end
        trans
end
unwrap(header=0x61, footer=0x62, escape=0xAB) { |frame| ... } click to toggle source

Example shamelessly stolen from eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines/

# File examples/frames.rb, line 6
def unwrap(header=0x61, footer=0x62, escape=0xAB)
        loop do
                byte = await
                next if byte != header

                frame = ""
                loop do
                        byte = await
                        case byte
                        when footer
                                yield frame
                                break
                        when escape
                                frame += await.to_s(16)
                        else
                                frame += byte.to_s(16)
                        end
                end
        end
end