AIタイトルアシストなら大袈裟でも恥ずかしくない!
n月刊ラムダノートVol.4, No.1の記事を読むぞ
「手を動かして学ぶストリーム処理入門」でKafkaの気持ちを理解したくなりました。
でもKafkaを使うのはめんどくさいので全部Rubyで書いてみようと思います。
実験用のデータ
github.com
githubに実験用のデータが置いてある。親切だ!
ヘッダつきタブ区切りのテキストファイルに気象情報が書いてある。
本文を読み進めると、タブ区切りのまま使わないでJSON風のマップに変換してるようだ。
何度もデータ形式を変換する処理があるのが興味深い。
結局のところ元の情報の表現(カラムの順序)を知っている人が作るんだからタブ区切り(あるいはArray)のままでもいい気がする。
そこは本質じゃないので1ターン目にオブジェクトにしてHashに入れることにした。
require 'pp'
require 'time'
class TSV
def self.load_file(fname, parser)
stream = File.foreach(fname).lazy.map {|x| x.chomp.split("\t")}
head = stream.next
self.new(head, stream, parser)
end
def initialize(cols, stream, parser)
@cols = cols
@stream = stream
@parser = parser
end
def next
[@cols, @parser.zip(@stream.next).map {|m, v| m.call(v)}].transpose.to_h
end
end
if __FILE__ == $0
tsv = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
[Time.method(:parse), method(:Float), method(:Float)])
while (it = tsv.next rescue nil)
pp it
end
end
こんな感じの出力になる
{"timestamp"=>2022-01-01 00:00:00 +0900,
"temperature [°C]"=>-8.2,
"rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 01:00:00 +0900,
"temperature [°C]"=>-8.6,
"rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 02:00:00 +0900,
"temperature [°C]"=>-9.3,
"rainfall [mm]"=>0.0}
...
なお、TSVは外部イテレータ風にした。
replaymanを真似する
実験用のツールにreplaymanというのがあるみたい。
元データの発生時刻(頻度、間隔)を再現しながらデータを出力する係で、再生開始時刻や早送りも設定できる。
似たようなものを作ろう
class Reply
def initialize(tsv, timed_by, speed=1, start=nil)
@tsv = tsv
@start = start
@speed = speed
@timed_by = timed_by
end
def sleep_until(at)
sleep([at - Time.now, 0].max)
end
def run
origin = Time.now
while (rec = @tsv.next rescue nil)
t = rec[@timed_by]
@start ||= t
diff = t - @start
sleep_until(origin + (diff / @speed))
yield(rec)
end
end
end
if __FILE__ == $0
src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
[Time.method(:parse), method(:Float), method(:Float)])
reply = Reply.new(src, 'timestamp', 3600)
reply.run {|x| pp x}
end
(実行しないとわからないと思うが)3600倍速でデータが印字されるぞ!
開始時刻を指定するのがめんどうだったので、最初のデータの時刻をデフォルト値にした。
ちなみにReplyは内部イテレータぽくした。うーむ。なんで私はこれがいいと思ったのだろうか。あとでまた考えよう。
Driqでストリームぽくする
DriqはThe dRuby Bookに書いたDripから永続化を省いた消費されないキューだ。Drip - persistent + queue。
Driqについてはn月刊ラムダノートの既刊に掲載されているので金で買って読んで欲しい。
あとRubyKaigi TakeoutでDriqの話をしてました。(忘れてた)
rubykaigi.org
dRubyを使えば複数プロセス/マシンでDriqとクライアントを分割できるが、分散できるのは知っているので今日はモノプロセス/マルチスレッドで書くことにする。
require 'pp'
require 'time'
require 'driq'
src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
[Time.method(:parse), method(:Float), method(:Float)])
r = Reply.new(src, 'timestamp', 3600)
s_source = Driq.new
Thread.new(s_source) do |sin|
cursor = 0
while true
cursor, value = sin.read(cursor)
pp value
end
end
r.run {|x| s_source.write(x)}
Replyしたデータをトピックに相当するs_sourceにwriteして、別スレッドでreadして印刷するだけのものです。
気温が0度以上のものだけが流れるs_non_negもやってみる
require 'pp'
require 'time'
require 'driq'
src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
[Time.method(:parse), method(:Float), method(:Float)])
r = Reply.new(src, 'timestamp', 36000)
s_source = Driq.new
s_non_neg = Driq.new
Thread.new(s_source, s_non_neg) do |sin, sout|
cursor = 0
while true
cursor, value = sin.read(cursor)
sout.write(value) if value['temperature [°C]'] >= 0
end
end
Thread.new(s_non_neg) do |sin|
cursor = 0
while true
cursor, value = sin.read(cursor)
pp value
end
end
r.run {|x| s_source.write(x)}
n月刊ラムダノート Vol.2, No.1(2020)(電子書籍のみ)www.lambdanote.com
付き合いで送ろう!m_seki's wishlist
今年はERB/dRubyが25周年なので!
https://www.amazon.co.jp/hz/wishlist/ls/1R43BBPSPUEEE/