Expressに22個以上の要素の配列クエリを渡すときは気をつける

自作ライブラリを使ったBlueskyクライアントを実装していて遭遇したバグ。

ATriumからのfeed.getPostsでurisが21個までなら大丈夫だが22個以上だとエラーになることが発覚した。問題切り分け中… ここまで自作クライアント実装してようやく気付く問題があるんだから やっぱりドッグフーディング大事やな、、

— すぎゃーん (@sugyan.com) Apr 21, 2024 at 10:21 AM

問題の詳細と対応は以下の通り。

Expressで使っている qs が、[] を含むキーのときに配列として扱おうとするが、そのindexの大きさによって結果がarrayではなくobjectになったりする、というもの。 その境界のdefault値が20なので、順番にindexをつけているとuris[0]=...&urls[1]=...&uris[20]=... という 21個までなら問題ないが 22個になると突然objectとして扱われるようになってしまう、と。

www.npmjs.com

確かにindex指定したものを読んでくれるのは便利だが… そんな挙動があると知らずにrequestを送っているとハマる。

const qs = require("qs");

console.log(qs.parse("ids=foo&ids=bar"));
console.log(qs.parse("ids[]=foo&ids[]=bar"));
console.log(qs.parse("ids[0]=foo&ids[1]=bar"));
console.log(qs.parse("ids[1]=foo&ids[0]=bar"));
console.log(qs.parse("ids[20]=foo"));
console.log(qs.parse("ids[21]=foo"));
{ ids: [ 'foo', 'bar' ] }
{ ids: [ 'foo', 'bar' ] }
{ ids: [ 'foo', 'bar' ] }
{ ids: [ 'bar', 'foo' ] }
{ ids: [ 'foo' ] }
{ ids: { '21': 'foo' } }

わりとみんなハマっているようで困っている人も多そうではあるが、とにかくそういう仕様なようなので express なserverにリクエストを送るときは気をつけるしかなさそう。

github.com

BlueskyのDesktop Client Appを作り始めている

ATProtocolのRustライブラリを作っている 活動の続きとして、ライブラリの動作確認も兼ねてDesktop Applicationを作ってみることにした。

Tauri

RustでDesktop Application作成、といえば今もっとも普及しているのがTauriだろう。

tauri.app

ステータスとしてはMobile Application対応を含む v2のリリースに向けてBeta versionが公開されている、という状態のようだ。

Tarium

で、AT ProtocolのためのRustライブラリである ATrium をバックエンドで使ったTauri製のBluesky Client、ということで「Tarium」という名前にした。

github.com

Frontend

フロントエンドは各プラットフォームでのWebView上で動くわけで、結局HTMLやJavaScriptで用意することになる。 気合いでFull Rustで実装しようとすれば Yew などで構築することもできるようだったが、自分はまだ使ったことがなく学習が大変そうだったので、結局 Vite + React でTypeScriptで書くことにした。あとは React RouterTailwind

Backend

普通にWeb Applicationが作れるので、Bluesky Clientとしてはatproto公式のTypeScriptライブラリを使ってフロントエンドで完結するものも作れてしまう。

しかし自分の目的としては ATriumの動作確認なので、意地でもAT Protocolの処理はバックエンドにやらせることにしている。 結局バックエンドは command を介してフロントエンドからのXRPC Requestをproxyするだけ、になってしまいがちで 何のためにRustバックエンドが存在しているんだ… という感じにはなってしまうけど。

一応役割としては内部のStateとしてAtpAgentを持ってSession管理などをしている、くらい。単にproxyするだけだと悔しいので、feedの取得などはバックグラウンドタスクで定期的に実行して新しく表示すべきものをEventのemitによってフロントエンドに通知する、という方式にしている。まぁそれもフロントエンドでsetIntervalで実装するのとたいして変わらないんだろうけど…。

今のところPWAと比較して「Native Appならでは」というのはあまり無く、ようやくデスクトップ通知ができるようになった、くらいか? 今後はもしかしたらショートカットキーを使った操作などが実装できるとNativeならでは感がでるかもしれない。

型定義生成

ということで結局は書くコードの大半はTypeScriptになる。 そしてデータはバックエンドから取得するが IPC を介して結局JSONでやりとりすることになるので、Lexiconに沿った型にmappingさせて表示に使うにはどうしても型定義が必要になる。

最新のLexiconから対応する型定義などを生成したものが @atproto/api パッケージなどに含まれていて、公式appや他の多くのWeb Clientなどではこれを使っているはず。 Tariumのフロントエンドでもこれを素直に使えば良いが、この api パッケージにはXRPCのためのClient機能なども含まれていて、折角バックエンドでXRPC処理するようにしているのに本末転倒、というか「負けた」感じになってしまう…。

なので、意地でも @atproto/api は使わずに必要な型定義だけを得たい!ということで この api のためのコードを生成している @atproto/lex-cli をハックして必要なものだけを生成して使う、ということをした。

https://github.com/sugyan/tarium/tree/main/gen-types

現在の状況

v0.0.8 で、FollowingのタイムラインとCustom Feeds、あと通知がようやく表示できるようになって あとはテキストのみの単純な投稿ならできる、というくらい。まだまだ足りない機能だらけではあるが ちょっとずつ使えるようになってきている、とは思う。 自分が満足いくところまでは頑張って機能実装していくつもり。

実際、ここまで実装する過程でATriumがgetPreferencesを正しく取得できないバグに気付いたりできたので、当初の目的としては正しく達成できている。

Rustのserdeで、データフォーマットによって異なる型にserialize/deserializeする

背景

BlueskyのAT ProtocolのRust版ライブラリを作っている。

memo.sugyan.com

github.com

その中で最近実装した機能の話。

AT Protocolで扱うデータモデルのSpecは、以下のページで書かれている。

atproto.com

この中に、Lexiconでcid-linkという名前で扱われる型がある。

https://atproto.com/specs/lexicon#cid-link

つまりIPLDのLinkCIDで表現する型、ということのようだ。

で、これらのデータを扱うわけだが、そのデータフォーマットが2種類ある。 IPLDではデータ送信のためのCodecとして、binary formatのDAG-CBORと human-readable formatのDAG-JSONを定めている。

https://ipld.io/docs/codecs/

AT Protocolでは、効率的にデータを扱いたい場合にはDAG-CBORを用い、XRPCのHTTP APIなどではDAG-JSONとは異なる規約のJSONフォーマットを扱う、らしい。

で、cid-linkについては以下のように書かれている。

link field type (Lexicon type cid-link). In DAG-CBOR encodes as a binary CID (multibase type 0x00) in a bytestring with CBOR tag 42. In JSON, encodes as $link object

DAG-CBORでは、以下のようなバイナリ表現のCIDを含むバイト列、

0xd8, 0x2a, 0x58, 0x25, 0x00, 0x01, 0x71, 0x12, 0x20, 0x65, 0x06, 0x2a, 0x5a, 0x5a, 0x00, 0xfc,
0x16, 0xd7, 0x3c, 0x69, 0x44, 0x23, 0x7c, 0xcb, 0xc1, 0x5b, 0x1c, 0x4a, 0x72, 0x34, 0x48, 0x93,
0x36, 0x89, 0x1d, 0x09, 0x17, 0x41, 0xa2, 0x39, 0xd0,

JSONでは、以下のような $link という単一のキーを含むオジェクト、

{
  "$link": "bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a"
}

として表現されるらしい。

serde_json, serde_ipld_dagcbor

RustでJSONなどのデータフォーマットで Serialize/Deserialize する、となるとまず間違いなく serde を使うことになるだろう。 serde自体は Serialize/Deserialize を行うわけではなく、あくまでRustのデータ構造に対して汎用的にそれらを可能にするためのフレームワーク、という感じ。

ATriumではLexiconから生成された各XRPCに関連するデータの型をライブラリとして提供するので、それらの型に対して基本的にはserdeのattributesを付与するだけで、実際に何らかのデータフォーマットに対応した Seriazlier/Deserializer を使って変換操作をするのはライブラリのユーザ、ということになる。

実際のところ、JSONデータを扱うなら serde_json 一択だろう。 DAG-CBORについては、CBORデータを扱うことができるライブラリが幾つか存在しているが、2024-03時点でIPLDのLinkを正しく扱えるものとしては serde_ipld_dagcbor が現実的な選択肢になるようだった。

ので、この2つを使って実際に使われるデータに対して正しく Serialize/Deserialize できるようにする、ということを考える。

問題点: データフォーマットによって対象の型が異なる

JSONの場合/DAG-CBORの場合をそれぞれ独立して考えれば、構造に合わせて型を定義するだけなので簡単だ。

#[derive(Serialize, Deserialize, Debug)]
struct CidLink {
    #[serde(rename = "$link")]
    link: String,
}

fn main() {
    let cid_link_from_json = serde_json::from_str::<CidLink>(...);
    println!("{cid_link_from_json:?}");
    // => Ok(CidLink { link: "bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a" })
}
#[derive(Serialize, Deserialize, Debug)]
struct CidLink(cid::Cid);

fn main() {
    let cid_link_from_dagcbor = serde_ipld_dagcbor::from_slice::<CidLink>(...);
    println!("{cid_link_from_dagcbor:?}");
    // => Ok(CidLink(Cid(bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a)))
}

で、問題は両方に対応しようとする場合。ライブラリのユーザがJSONを使うか DAG-CBORを使うかどちらかだけであればまだ feature flags で切り替えるなどの対応が可能だが、「どちらも使う」というユースケースも考えられるので、

  • serde_json を使っている場合は $link キーを含むオブジェクト
  • serde_ipld_dagcbor を使っている場合は cid::Cid

として同じ CidLink という名前の型に情報を格納できるようにしたい。

最初の解決策: is_human_readable() による分岐

基本的には serde 自体は、呼ばれる Serializer/Deserializer についての情報を知ることができない。 が、 SerializeDeserialize を自分で実装すると、そのときに引数に含まれる serializerdeserializer に対して .is_human_readable() というメソッドを呼ぶことで一つ情報を得られる。 これは serde_json を使っていると true になり、 serde_ipld_dagcbor を使っていると基本的には false になるので、以下のように分岐させることで統一した CidLink で両方のデータフォーマットを扱うことができる。

#[derive(Debug)]
struct CidLink(Cid);

#[derive(Serialize, Deserialize)]
struct LinkObject {
    #[serde(rename = "$link")]
    link: String,
}

impl Serialize for CidLink {
    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
        if serializer.is_human_readable() {
            LinkObject {
                link: self.0.to_string(),
            }
            .serialize(serializer)
        } else {
            self.0.serialize(serializer)
        }
    }
}

impl<'de> Deserialize<'de> for CidLink {
    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        if deserializer.is_human_readable() {
            let obj = LinkObject::deserialize(deserializer)?;
            Ok(Self(
                Cid::try_from(obj.link.as_str()).map_err(serde::de::Error::custom)?,
            ))
        } else {
            Ok(Self(Cid::deserialize(deserializer)?))
        }
    }
}

これで解決、めでたしめでたし… といきたいところだが、そうもいかない。

うまくいかないケース

CidLink 単体が上手く処理されていても、それを子にもつ enum を "Internally tagged" や "Untagged" で区別しようとすると、問題が生じるようだ。

例えば、以下のようなもの。

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "tag", rename_all = "lowercase")]
enum Parent {
    Foo(Child),
    Bar(Child),
}

#[derive(Serialize, Deserialize, Debug)]
struct Child {
    cid: CidLink,
}

これは、 "tag" キーで指定されたvariantとしてdeserializeを試みる。JSONでいうと

[
  {
    "tag": "foo",
    "cid": {
      "$link": "bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a"
    }
  },
  {
    "tag": "bar",
    "cid": {
      "$link": "bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a"
    }
  }
]

といった配列で渡されたとき、1つめの要素は Parent::Foo(Child) 、2つ目の要素は Parent::Bar(Child) としてdeserializeすることになる。

これと同様の構造を持つDAG-CBORのデータを serde_ipld_dagcbor でdeserializeすると (そもそもこういうケースでCidを含むものをdeserializeできないという問題 もあったが)

Error: Msg("invalid type: newtype struct, expected struct LinkObject")

といったエラーになってしまう。

deserializer.is_human_readable() で分岐しているところでdebug printしてみると分かるが、このような構造のデータをdeserializeするときは、 serde_ipld_dagcbor を使っていても .is_human_readable()true になってしまうらしい。 serde の細かい挙動を知らないけど、 Internally tagged や Untagged の場合は一度mapデータとして保持してからtagや内容を見て型を決定する必要があるため?そこから目的の型にマッピングする際に使われるdeserializerはまた別物になるらしく、 .is_human_readable() は意図したものにはならないようだ。 おそらくこのあたり。

なので、上述のように enum を使っている箇所の下では .is_human_readable() による分岐は機能しない。

解決策(?): Ipld を経由しデータの構造によって分岐する

serde_ipld_dagcbor という名前の通り、これは Ipld というデータモデルを利用することを想定されている。このデータモデルは(互換性どれくらいか把握できていないけれど) serde_json::Value と同じように構造化されたデータを保持できる。JSONには無い Link というものがある点でJSONの上位互換と考えても良いかもしれない。

ということで、deserializeしたいデータを一度 Ipld に変換してしまい、その構造を見てデータフォーマットを推定して分岐する、という手段をとった。

use libipld_core::ipld::Ipld;

impl<'de> Deserialize<'de> for CidLink {
    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        let ipld = Ipld::deserialize(deserializer)?;
        match &ipld {
            Ipld::Link(cid) => {
                return Ok(Self(*cid));
            }
            Ipld::Map(map) => {
                if map.len() == 1 {
                    if let Some(Ipld::String(link)) = map.get("$link") {
                        return Ok(Self(
                            Cid::try_from(link.as_str()).map_err(serde::de::Error::custom)?,
                        ));
                    }
                }
            }
            _ => {}
        }
        Err(serde::de::Error::custom("Invalid cid-link"))
    }
}

少なくとも CidLink としてのデータであれば、 Ipld::deserialize(deserializer) は問題なく成功する。その結果は Ipld::Link か、 "$link"キーを含む Ipld::Map かどちらか、になるはずで、前者ならそのまま得られるCidを利用し、後者であればその "$link" の値からCidを復元する。

この手法であれば、 .is_human_readable() に依存せずに正しく判別でき、どちらのデータも同様にdeserializeできる。

fn main() {
    let parents_json = serde_json::from_str::<Vec<Parent>>(...)?;
    println!("{parents_json:?}");
    // => Ok([Foo(Child { cid: CidLink(Cid(bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a)) }), Bar(Child { cid: CidLink(Cid(bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a)) })])

    let parents_dagcbor = serde_ipld_dagcbor::from_slice::<Vec<Parent>>(...)?;
    println!("{parents_dagcbor:?}");
    // => Ok([Foo(Child { cid: CidLink(Cid(bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a)) }), Bar(Child { cid: CidLink(Cid(bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a)) })])
}

今回のようなケースでしか機能しないかもしれないが、一応これで問題は解決した。

(他にもっと良い方法をご存知の方がいれば教えてください…)

汎用的? な解決策: Untagged

一般的には、このように場合によって異なる型にdeserializeしたい場合は Untagged なenumを使うのが良いのかもしれない。

#[derive(Serialize, Deserialize)]
#[serde(untagged)]
pub enum CidLink {
    Raw(Cid),
    LinkObject {
        #[serde(rename = "$link")]
        link: String,
    },
}

serde(untagged) の場合はvariantを順番に試し、最初にdeserializeに成功したものを結果として返す。 ので、上の例の場合はまず Cid 単体としてdeserializeしてみて、失敗したら "$link" キーを持つオブジェクトとしてdeserializeしてみる、という動作になる。記述の順序を変えれば試行の順序も変化する。

ベンチマーク

上述した"Untagged"の場合は、記述の順序が大事になる。上の例の通りだとJSONのデータは毎回最初にCidとしてdeserialize試行して失敗してようやくオブジェクトとして試行、となり効率が悪い。しかし順序を逆にすると今度はDAG-CBORのデータを処理する際に毎回オブジェクトとして試行して失敗して…となる。 今回は対象が2種類だけなので差は小さいかもしれないが、これが何種類もあると…。

その点ではIpldを経由する手法の方が、中間の変換処理は入るが安定した効率は期待できる。

当然ながら、JSONなら最初からオブジェクトとして DAG-CBORなら最初からCidとしてdeserializeするのが最も効率的で速い。 それぞれを基準として、「Raw→LinkObjectのuntagged (untagged_1)」「LinkObject→Rawのuntagged (untagged_2)」「Ipld経由 (via_ipld)」のそれぞれのdeserializeをベンチマークとってみた。

running 8 tests
test bench_cbor_only       ... bench:          59 ns/iter (+/- 1)
test bench_cbor_untagged_1 ... bench:          83 ns/iter (+/- 2)
test bench_cbor_untagged_2 ... bench:         172 ns/iter (+/- 8)
test bench_cbor_via_ipld   ... bench:          63 ns/iter (+/- 1)
test bench_json_only       ... bench:          77 ns/iter (+/- 2)
test bench_json_untagged_1 ... bench:         276 ns/iter (+/- 4)
test bench_json_untagged_2 ... bench:         134 ns/iter (+/- 9)
test bench_json_via_ipld   ... bench:         325 ns/iter (+/- 6)

DAG-CBORに関しては、 untagged_2 がやはり毎回LinkObjectの試行の後になるので3倍ほど遅くなってしまう。一方で via_ipld はほぼ同等の速度で処理できているようだ。

JSONに関しては、どれも大きく遅くなるようだ。意外にも untagged_2 でも2倍くらい遅くなる…。via_ipld はcidのparse処理も入るので当然ながら最も遅くなってしまう、という結果だった。

実装結果

ということで、

  • どうしてもJSONだけを扱うときと比較すると遅くなる
  • そもそもXRPC RequstにはJSONしか使わない
    • DAG-CBORが必要になるのはSubscriptionなどrepoデータを読むときのみ

ということもあって、dag-cbor featureを有効にしたときのみ、Ipldを経由する方式で両方のデータフォーマットに対応するようにした。

その後

この実装をした後、 types::string::Cid という型が導入されて、Cidの文字列表現であるものはこの型でvalidationするようになった。LinkObjectのものも値は String ではなくこの types::string::Cid を使うべきで、そうなるともはやJSONの速度差もそんなに気にしても仕方ない感じになってくる。

running 8 tests
test bench_cbor_only       ... bench:          59 ns/iter (+/- 0)
test bench_cbor_untagged_1 ... bench:          78 ns/iter (+/- 3)
test bench_cbor_untagged_2 ... bench:         169 ns/iter (+/- 3)
test bench_cbor_via_ipld   ... bench:          63 ns/iter (+/- 3)
test bench_json_only       ... bench:         227 ns/iter (+/- 4)
test bench_json_untagged_1 ... bench:         426 ns/iter (+/- 6)
test bench_json_untagged_2 ... bench:         288 ns/iter (+/- 5)
test bench_json_via_ipld   ... bench:         324 ns/iter (+/- 6)

もはや feature flags での切り替えは廃止して、必ずIpldを経由する方式にしてしまって良いかもしれない。

Cloudflare Workersで、自分のはてブをBlueskyに流す

bsky.app

そういえば、古き良き時代は自分のブックマークは自動でTwitterに投稿されていたのだった。 今はBlueskyがメインになっているので、同じ仕組みが欲しい、と思った。ので、作った。

github.com

要件

自分のブックマークはRSSで取得できる。定期的にチェックして新しいのがあれば、といったロジックで検出できる。 なので、基本的にはプログラムを定期実行できる場所があればGitHub Actionsとかでも良い。 ただ、対象のブクマ内容をpostする前に、それを既にpostしているか否かを知る必要がある。

専用のbotアカウントとかであれば、そのアカウントのpost feedを取得して最近のものをチェックすれば確認可能だが、自分のアカウントに流す場合はその方法だと手動で大量にpostしたりしているタイミングだと埋もれてしまう。 よって、何らかの方法でpost済みのものを保持しておく必要がある。

先行事例

で、良いなと思ったのがこれ。

github.com

Cloudflare Workersでも Cron Triggers で定期実行できて、また Workers KV のようなものでデータを保存しておくことができる。 上述要件を満たすプラットフォームとしてはちょうど良さそう。

Rust版

前述のをそのまま使わせていただいても良かったのだけど、折角 ATProtocolのRustライブラリを作っている のだし、Rustで同じような機能のものを作ってみることにした。

WASM対応

まず拙作 ATrium を使ってみようとしたところ、早速ビルドに失敗した。 あまり考えていなかったが、 wasm32-unknown-unknown のtargetでビルドしようとすると失敗する箇所が多々あった。

ので、まずはそこから。

github.com

主に問題は async-trait を使っているところで、これが Send を要求するため wasm32 targetの場合に問題になるようだった。 これは (?Send) を追加することで回避できるとのこと。 wasm32 targetのときだけこれを追加する形に変更した。他のtargetの場合は何も影響を受けない。

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]

あとは atrium-xrpc-client という専用の非同期HTTP Clientを幾つか提供していたが、それらは reqwest backendのもの以外はすべてwasm32には対応していなかったので、諸々変更して wasm32 の場合は reqwest のClientだけが利用可能になるようにした。

Cloudflare Workersでの実装

以前の記事 でも書いた通り、Cloudflare WorkersはRust環境でも作成することができる。今回のものもJavaScript/TypeScriptを1行も書かずに作成できた。

Cron Triggers用に #[event(scheduled)] をつけた関数を実装するだけ。

1MB制限との戦い

うっかり色んなライブラリを依存に入れていると、ビルド後のサイズが無料枠上限の1MBを超えてしまう。

RSSの解析に最初は feed-rs を使っていたが、ちょっと大きいようだったのでもっとシンプルにRSS 1.0のものだけ扱える rss に変更したりした。

Fetch API

ローカル実行する時の感覚で reqwest を、コンテンツ取得や atrium-xrpc-client のバックエンドとして使っていたが、実は必要なかった… worker-rs には worker::Fetch があり、これを使うことでHTTPリクエストを処理できる。

ATriumでは、非同期HTTP Clientとしてはどんなものを使っても良いようにTraitだけ用意して開発者が自由にClientを実装・選択できるように設計していた。 ので、今回は worker::Fetch を使うように少し繋ぐ部分を書くだけでXRPC Clientとして問題なく利用できるようになった。

github.com

ただ、本来このFetch APIを使って Transform images もできるはずなのだが、どうもRust版はまだ対応できていないっぽい。

Add support for the `cf.image` field in `fetch()` properties by jakubadamw · Pull Request #351 · cloudflare/workers-rs · GitHub

無理矢理やればできるのかな… とはいえOGPの画像はわりと変換かけずにそのまま uploadBlob しても失敗することは少なそう?なのでしばらく様子見でいいかな。

KVでのSessionStore?

ちなみに atrium-api ではセッション情報を管理する AtpAgent というのも用意していて、ここでは SessionStore も自分で実装することで認証情報を保持しておくことができるように設計していた。 通常は memoryに保持するだけの MemorySessionStore を利用するが、今回のようなケースで KV を使ってそこに保持しておくことができないか? と試みたが、やはり Send trait要求の壁があり上手くいかなかった… これはちょっと対応するのも難しいかなぁ。

そもそも今回のように難しい並列処理とか無いような場合はわざわざsession保持しておかなくても都度ログイン(createSession)する、で問題なさそうではある。

Rustで将棋の局面画像生成、そしてCDN Edgeで動的生成

背景

先行・類似事例

その他、自分でも過去にGoで作成したものがあったが、もう今はメンテしていなくて動かない。画像素材を公開してくださっていたサイトも消滅してしまっている。

自作のメリット

  • 自分好みの画像を生成できる
    • 形式もまずはPNGで、SVGも後で追加するなど可能かも
  • Rustのプログラムから使えるライブラリとして存在していると嬉しい
    • WASM化してWebアプリ化もしやすいかもしれない

Rustで局面画像生成

基本的には、透過PNGの素材を組み合わせて盤上に駒を配置するだけ。

あとは持ち駒の表現が様々な表示方法があるが、歩は最大18枚なので重ねて並べるのは微妙で、素直に個数を数字としてレンダリングした方が分かりやすいと思う。

盤・駒画像の素材

最近では Electron将棋 のKubo, Ryosukeさんが使いやすい形で素材画像を公開してくださっている。

sunfish-shogi.github.io

ので、これを使わせていただくことにした。 二文字駒もあると嬉しかったが、一時期追加されていたものの諸事情で削除されてしまったようだ。 今後 誰かがオリジナルで作成してここに追加してくれたりするようになるといいな…。

画像処理

image というライブラリが画像処理によく使われているようだ。 PNG以外にも様々な画像形式に対応していて、拡大縮小や重ね合わせなどの基本的な処理も簡単にできる。

作成時にはPNG画像を合成する操作だけなので、もっと軽いライブラリで同様のものが実現できるならそちらの方が良いかもしれない。とりあえずはimageで実装してみた。

入出力

入力は局面の情報を持つものとして shogi_corePartialPosition を使うことにした。出力はimageRgbaImage

ライブラリ使用者は必要に応じて例えば shogi_usi_parse を使ってSFEN文字列からPartialPositionを作り、それを元に生成した結果のRgbaImageを加工したり好きな形式でファイルに書き出したりすれば良い、という想定。

pub fn pos2img(position: &shogi_core::PartialPosition) -> image::RgbaImage {
    Generator::default().generate(position)
}

Generatorと下準備

局面画像を作成する際の計算負荷が減るよう、Geenratorという構造体を用意した。 スタイルを指定してnewした時点で必要な駒などの素材データを読み込んでおき、generateを呼ばれた際にはそれらを貼り合わせるだけ、にする。 これによって、同じスタイルで複数の局面を生成する場合には、素材の読み込みが一度で済んで高速に生成できるようになる、はず。

盤と駒の画像は事前にサイズを決めておき、それぞれ切り出した上で最適化したPNGファイルとして置いておく。ファイル読み込みはせず、include_bytes!でメモリ上から読み込んで使う。

Publish

というわけで出来上がったので crates.io に公開した。 ご自由にお使いください。

Web Appで使う

せっかくライブラリとして作ったので、ちょっと加工してWeb Appとしても使えると嬉しいかもしれない、と思って作ってみることにした。

局面を表現するSFEN文字列は lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 のように表現される。これをURLのPathとして受け取って、対応する局面画像を返すようなものを考えた。

例:

https://example.com/3sks3/9/4S4/9/1+B7/9/9/9/9%20b%20S2rb4g4n4l18p%201

スペースは %20エンコードする必要があるのでちょっとカッコ悪いが…。

CDN Edgeで動かす

最近よく聞くようになったEdge Computingをまだ試したことがなかったので、この機会に色々触ってみることにした。

wasm-packでWebAssembly作成

まずは簡単なインタフェースを決めて、WebAssemblyで使えるようにパッケージを用意した。 簡単に扱えるように、入力はSFEN文字列としshogi_usi_parseでそれをparseし局面情報を取得、出力は生成した画像のPNG形式バイナリデータとした。

use shogi_img::{image, pos2img, shogi_core};
use shogi_usi_parser::FromUsi;
use std::io::Cursor;
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
pub fn sfen2png(sfen: &str) -> Vec<u8> {
    let pos = shogi_core::PartialPosition::from_usi(sfen).unwrap_or_default();
    let mut cursor = Cursor::new(Vec::new());
    pos2img(&pos)
        .write_to(&mut cursor, image::ImageFormat::Png)
        .ok();
    cursor.into_inner()
}

あとは wasm-pack でビルドすればいい感じにwasmファイルが生成される。

Deno Deploy

上記のwasmを使って最も簡単に実現できたのが、Deno Deploy だった。 wasm-pack build --target deno するとDeno用のwasmファイルが生成され、あとはそれを読み込んで使うだけ。

import { sfen2png } from "./pkg/sfen2png.js";

Deno.serve((req: Request) => {
  const sfen = `sfen ${decodeURI(new URL(req.url).pathname).slice(1)}`;
  return new Response(sfen2png(sfen), {
    headers: {
      "content-type": "image/png",
    },
  });
});

これだけのコードを書いて、あとはdeployすれば動く。簡単すぎてすごい。

$ curl -I https://sfen2img.deno.dev/
HTTP/2 200
content-type: image/png
vary: Accept-Encoding
date: Wed, 24 Jan 2024 14:39:32 GMT
content-length: 447458
via: http/2 edgeproxy-h
server: deno/gcp-asia-northeast1

Vercel Edge Functions

同様にwasmを読み込んで使えるものとして、Vercel Edge Functions を試してみた。

ドキュメントには wasm-pack を使った例が無いようで、他の人が作っているexample repositoryなどを参考にしてどうにか。 wasm-pack build --target web で生成し、Next.jsで/src/app/[[...slug]]/route.ts で使う。

// @ts-ignore
import wasm from "@/pkg/sfen2png_bg.wasm?module";
import init, { sfen2png } from "@/pkg/sfen2png.js";

export const runtime = "edge";
export const dynamic = "force-dynamic";

export async function GET(request: Request) {
  await init(wasm);
  const sfen = `sfen ${decodeURI(new URL(request.url).pathname).slice(1)}`;
  return new Response(sfen2png(sfen), {
    headers: {
      "content-type": "image/png",
    },
  });
}

とりあえずこれでローカルでは動いた。deployしてみようとしたが

Error: The Edge Function "[[...slug]]" size is 1.62 MB and your plan size limit is 1 MB.

となってしまった。wasmだけで1.3MBくらいあり、Hobbyプランでは1MBまで という制限を超えてしまうようだ。 Pro以上のプランにするか、もっと軽いバイナリになるように工夫が必要になる…。

Cloudflare Workers

よく名前を聞くので是非試してみたかった、Cloudflare Workers

wrangler でプロジェクトを作成する際に worker-rust のテンプレートを使うと、Rustで worker を使うRustプロジェクトが生成される。

wrangler generate [name] https://github.com/cloudflare/workers-sdk/templates/experimental/worker-rust

そして lib.rs を以下のように編集するだけ。

use sfen2png_wasm::sfen2png;
use urlencoding::decode;
use worker::*;

#[event(fetch)]
async fn main(req: Request, _env: Env, _ctx: Context) -> Result<Response> {
    let decoded = decode(&req.path())
        .map(String::from)
        .expect("decode failed");
    let sfen = format!("sfen {}", decoded.strip_prefix('/').expect("invalid path"));
    Ok(Response::from_bytes(sfen2png(&sfen))?
        .with_headers(Headers::from_iter([("content-type", "image/png")])))
}

JavaScriptdecodeURI同等のものは標準には無いようなので urlencoding ライブラリを使っている。

あとは wrangler deploy するだけ。その中で worker-build によってCloudflare Workers用に諸々ビルドされるようで、worker-buildが内部でwasm-packを実行したりしているようだ。 Rustだけで完結し、JS/TSのコードを書く必要がない。wranglerのためにnpmを使うくらい。

Cloudflare WorkersもVercel Edge Functionsと同様に Free planでは 1 MB まで という制限があるが、こちらは size after compression ということで事前にgzip圧縮して送信してくれるおかげで660KB程度になり、制限に引っかかることなくdeploy成功する。

Total Upload: 1414.62 KiB / gzip: 659.91 KiB
$ curl -I https://sfen2img.sugyan.workers.dev/
HTTP/2 200
date: Thu, 25 Jan 2024 06:16:53 GMT
content-type: image/png
content-length: 447458
report-to: {"endpoints":[{"url":"https:\/\/a.nel.cloudflare.com\/report\/v3?s=cT5XwLyJxYgiWXTJ85RHOGIRSxlpJSnl%2F6iPDoQW097set8BelY7M%2Fc6mRULGktwSqBU2Jlv6UoJ3w6eYFDM4bBjNHmZPGvfrnipB8u4Fxmkc3T%2BjFVUyF80dEFkxsn5X1Lp9OzTRhytCWAf%2BLA%3D"}],"group":"cf-nel","max_age":604800}
nel: {"success_fraction":0,"report_to":"cf-nel","max_age":604800}
server: cloudflare
cf-ray: 84ae63d57d908370-KIX
alt-svc: h3=":443"; ma=86400

Fastly Compute@Edge

最後に Cloudflare Workersと並んでよく名前を聞く、Fastly Compute@Edge

こちらは fastly というCLIツールを使う。

$ fastly compute init

...

Language:
(Find out more about language support at https://developer.fastly.com/learning/compute)
[1] Rust
[2] JavaScript
[3] Go
[4] Other ('bring your own' Wasm binary)

このあたりから選べそう。今回はRustで。Cloudflare Workersと同様にRustプロジェクトが生成されるので、同じようにmain.rsを編集していく。

use fastly::http::header;
use fastly::{Error, Request, Response};
use sfen2png_wasm::sfen2png;
use urlencoding::decode;

#[fastly::main]
fn main(req: Request) -> Result<Response, Error> {
    let decoded = decode(req.get_path())?;
    let sfen = format!("sfen {}", decoded.strip_prefix('/').expect("invalid path"));
    Ok(Response::from_body(sfen2png(&sfen)).with_header(header::CONTENT_TYPE, "image/png"))
}

Request/Responseなどのインタフェースは worker とは多少異なるが、おおまかには同じなのでこれくらいの内容であればほぼ同じような感覚で書ける。

あとは fastly compute publish するだけ。こちらは wasm-pack なども使わない。

$ curl -I https://sfen2img.edgecompute.app/
HTTP/2 200
content-type: image/png
x-served-by: cache-nrt-rjtf7700062-NRT
date: Thu, 25 Jan 2024 07:26:11 GMT

その他

Edge Computing系のサービスは他にも多くあるようだが、今回はこれくらいで。 大抵はJSからwasmを呼び出す形でVercel Edge Functionsと同様に動かせる、と予想している。

サイズ制限は厳しい場合が多いので、.wasmが1MBを切る程度には軽量できた方が望ましそうではある…。

まとめ

局面画像生成ライブラリを作ったおかげで、詰将棋画像を投稿するBluesky BotをRustだけで作ることができた。

bsky.app

スクレイピングして得たkifファイルをparseして局面情報を取得する部分も 自作ライブラリ を使っているので、

  • kifから PartialPosition への変換
  • PartialPosition から画像生成
  • 生成画像を含むPostをBlueskyに投稿

の3つを自作のライブラリを使って実現していることになる。

副産物として、Edgeで動的生成するWeb Appが複数できた。 無料枠でそのまま置いておくので、使える限りはご自由にお使いください。

Repository

Advent of Code 2023 を完走した

毎年12月に開催されている Advent of Code に、2019年から参加している。

過去記事:

2023年のAdvent of Codeにも挑戦していて、年が明けてしまったが先日ようやく25日すべての問題に解答して 50 個のスターを集めることができた。

bsky.app

2023年は例年と比較しても難易度が高いものが多かったように思う(英語のストーリーを読解しきれない、のは毎年のことだが)。 いちおう「他の人の解答を見たりせずにまずは自力で回答する」という目標は達成してとりあえずの自力完答はできたが、正直どうするのが正しかったのか分からん…というものも多かった。

github.com

これから Reddit での議論を見たり他の言語でも解いてみたりして、もうちょっと勉強しておこうと思っているところ。

問題の概要と解いた感想

ネタバレしない程度に。

day01

行の文字列の中から数値(数値)を見つける。

1日目の小手調べにしてはpart2がちょっと面倒だった

day02

3色の石の数を推測するゲーム。

正しく入力をparseして処理できれば特に問題ない

day03

2次元の図ではあるが横に並ぶ数字は数値として見る必要がある、というもの。

数値と記号の位置関係をうまく判定できれば、という感じ

day04

クラッチカードの得点計算。

part2はAoCらしいメタな感じでちょっと頭を使う面白い問題

day05

数値の範囲から別の範囲へのマッピング、を繰り返す。

急に難易度が上がって大変だった…!

day06

チャージ時間を調整してボートが進める距離を予測する。

入力のフォーマットが不自然だったのは意味があったか…

day07

トランプのポーカーの役の強さでソートする。

またしてもpart2の展開を予測できなかった

day08

2分岐の迷路を指定された方向に無限に進み続ける。

part2がやはり少し難しく。入力が上手く作られていることに救われている感じ

day09

数列から差分を使って次の値を予測する。

スッと解けると気持ち良い

day10

2次元grid内でループする経路を検出。

part2はだいぶ苦労した… 理論を知っていればもっと簡単だったのだろうか

day11

膨張し続ける宇宙空間で銀河間の距離を求める。

これはpart2はこうなるだろうな、というのが予想できた

day12

イラストロジック的なもの。

そろそろ探索の実装も一筋縄ではいかなくなってくる

day13

2次元grid上で鏡写しになっている境界を見つける。

簡単な方法を自力では思いつけなくて悔しい

day14

2次元grid上で丸い石を一方向に動かして詰めていく。

今年は2次元gridモノが多いな…。

day15

HashとHashMapの独自実装。

工夫のしどころが見つからなかった…

day16

光の反射と分裂シミュレーション。

また2次元grid… しかしこういうのはvisualizationが面白そう

day17

動き方に制約のある最短経路探索。

制約ひとつで色んな考え方が生まれて面白いなぁ

day18

命令の通りに線を引いて囲んだ面積を求める、というもの。

AoCではよく出てくるタイプのもの、かな? すごい苦労したが…

day19

4つの値に対して分岐していくworkflowに流し込んで結果を得る。

どんどん実装が大変になってきた…!

day20

状態を保持する回路を使い、通るpulseのシミュレーション

しばらく悩んで色々ためして一応どうにか解を出せたが… という感じ

day21

2次元grid迷路内で、偶数歩で辿り着き得る範囲は。

これはめっちゃ難しくて悩みまくった… 何度も試行錯誤して解は出せたが

day22

3次元空間でジェンガを積んでいく。ブロックを取り除くとどうなる?

かなり考え込んでようやくどうにか解けた、、という感じ

day23

2次元gridでの一筆書き(同じ点を二度と通らない)最長経路。

計算は大変だったけどどうにか

day24

3次元空間内で移動する座標たち…!

おそらく2023で一番の難問だったと思う、しかし解けるとスッキリする。。

day25

グラフを3辺切断して2つのグループに分ける。

最小カット問題、というのか?アルゴリズム知らなくてちょっと泥臭いやり方で解いた

自動でtokenをrefreshして再送するRustの非同期HTTP Client

半年ほど前から、BlueskyのAT ProtocolのRust版ライブラリを作っている。 memo.sugyan.com

github.com

その中で最近実装した機能の話。

API Agent

本家の atproto (TypeScript実装)に AtpAgent というものがある。

この AtpAgent は、(Blueskyだけに限らない) AT Protocol のための汎用的なエージェントとして提供されている。その機能の一つとしてtokenの管理機能がある。

AT Protocolの認証

少なくとも 2023/11 時点では HTTP Bearer auth でJWTを送信することで認証を行う方式となっている。

com.atproto.server.createSession でログイン成功すると accessJwtrefreshJwt などが含まれる認証情報が返ってくるので、その accessJwt をBearer tokenに使って各エンポイントにアクセスする。

また、 com.atproto.server.refreshSession というエンドポイントがあり、ここを refreshJwt をtokenに使って叩くことでtokenを更新することができる。

tokenの管理と自動更新機構

で、 @atproto/api で提供されている AtpAgent はそれらのtokenの管理を自動で行う機能を持っている。

export class AtpAgent {
  ...
  session?: AtpSessionData

  /**
   * Internal fetch handler which adds access-token management
   */
  private async _fetch(...): Promise<AtpAgentFetchHandlerResponse> {
    ...
    // wait for any active session-refreshes to finish
    await this._refreshSessionPromise
    // send the request
    let res = await AtpAgent.fetch(...)
    // handle session-refreshes as needed
    if (isErrorResponse(res, ['ExpiredToken']) && this.session?.refreshJwt) {
      // attempt refresh
      await this._refreshSession()
      // resend the request with the new access token
      res = await AtpAgent.fetch(...)
    }
    return res
  }
}

まず現時点でもっているsession情報を元にリクエスト処理を試み、そのレスポンスが expired のエラーだったときのみそれを検出してtokenをrefreshして同じ内容のリクエストを再送する。最初のリクエストが成功していた場合はそのままそのレスポンスを返す。 という動き。

ATriumでの実装

で、これと同様の仕組みを持つ AtpAgent をATriumでも実装しようと考えた。

XrpcClient trait

ATriumでは、AT ProtocolのXRPCリクエストを送るための XrpcClient というtraitを定義している。

#[async_trait]
pub trait HttpClient {
    async fn send_http(
        &self,
        request: Request<Vec<u8>>,
    ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>>;
}

pub type XrpcResult<O, E> = Result<OutputDataOrBytes<O>, self::Error<E>>;

#[async_trait]
pub trait XrpcClient: HttpClient {
    fn base_uri(&self) -> String;
    #[allow(unused_variables)]
    async fn auth(&self, is_refresh: bool) -> Option<String> {
        None
    }
    async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E>
    where
        P: Serialize + Send + Sync,
        I: Serialize + Send + Sync,
        O: DeserializeOwned + Send + Sync,
        E: DeserializeOwned + Send + Sync,
    {
        ...
    }
}

XRPC は規約に沿ったHTTPリクエスト/レスポンスでしかないので、内部としてはHTTPの処理をすることになる。 が、RustではHTTPリクエスト/レスポンスを処理するための標準ライブラリのようなものはなく、特に非同期の場合は reqwestIsahcSurf のような3rd partyのライブラリが多く使われている、と思う。

ATriumではこれらのライブラリをバックエンドとして開発者が選択できるよう、HTTP部分を抽象化する HttpClient というtraitを定義し、それを継承した XrpcClient を実装したインスタンスを内部に持つ AtpServiceClient が各XRPCを処理する形にしている。

XrpcClient::send_xrpc() はデフォルト実装を持っており、HttpClient::send_http() さえ実装されていれば、あとはそれを使ってリクエストに使う入力のserializeや返ってきたレスポンスJSONのdeserializeなどの処理を行うようになっている。

session管理するwrapper

認証については XrpcClient のメソッドとして async fn auth(&self, is_refresh: bool) -> Option<String> を定義しているだけなので、ここでどのようなtokenを返すかはtrait実装者に任される。

インメモリで管理する場合、内部で Arc<RwLock<Option<Session>>> のように持っておいて管理することで、マルチスレッドで共有されていても安全に扱えるようになる。

(参考: https://github.com/usagi/rust-memory-container-cs)

なので、内部で XrpcClient を実装したものを持つWrapperを作り、主な処理はそれに移譲して auth() だけを実装することで、保持しているsession情報からtokenを返す XrpcClient 実装を作ることができる。

use std::sync::Arc;
use tokio::sync::RwLock;

struct Wrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    inner: T,
    session: Arc<RwLock<Option<Session>>>,
}

#[async_trait]
impl<T> HttpClient for Wrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    async fn send_http(
        &self,
        request: Request<Vec<u8>>,
    ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>> {
        self.inner.send_http(request).await
    }
}

#[async_trait]
impl<T> XrpcClient for Wrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    fn base_uri(&self) -> String {
        self.inner.base_uri()
    }
    async fn auth(&self, is_refresh: bool) -> Option<String> {
        self.session.read().unwrap().as_ref().map(|session| {
            if is_refresh {
                session.refresh_jwt.clone()
            } else {
                session.access_jwt.clone()
            }
        })
    }
}

ここでは非同期の RwLock として tokio::sync を使っている。

これで、AtpAgent も同じsessionを共有し、例えばログイン成功時に取得したsession情報を書き込む、といったことをすれば、その後のリクエストでその値が使われるようになる。

struct AtpAgent<T>
where
    T: XrpcClient + Send + Sync,
{
    api: Service<Wrapper<T>>,
    session: Arc<RwLock<Option<Session>>>,
}

impl<T> AtpAgent<T>
where
    T: XrpcClient + Send + Sync,
{
    fn new(xrpc: T) -> Self {
        let session = Arc::new(RwLock::new(None));
        let api = Service::new(Wrapper {
            inner: xrpc,
            session: Arc::clone(&session)
        });
        Self { api, session }
    }
    async fn login(&self, ...) {
        // login process
        let session: Session = ...;
        self.session.write().await.replace(session);
    }
}

tokenの自動更新 (失敗例)

で、これを使ってtokenの自動更新を行うためには、XrpcClient::send_xrpc() をオーバーライドする形で実装すれば良さそう。

impl Wrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    async fn refresh_session() {
        // refresh process
        let session: Session = ...;
        self.session.write().await.replace(session);
    }
    fn is_expired<O, E>(result: &XrpcResult<O, E>) -> bool
    where
        O: DeserializeOwned + Send + Sync,
        E: DeserializeOwned + Send + Sync,
    {
        if let Err(Error::XrpcResponse(response)) = &result {
            if let Some(XrpcErrorKind::Undefined(body)) = &response.error {
                if let Some("ExpiredToken") = &body.error.as_deref() {
                    return true;
                }
            }
        }
        false
    }
}

impl<T> XrpcClient for Wrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    fn base_uri(&self) -> String {
        ...
    }
    async fn auth(&self, is_refresh: bool) -> Option<String> {
        ...
    }
    async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E>
    where
        P: Serialize + Send + Sync,
        I: Serialize + Send + Sync,
        O: DeserializeOwned + Send + Sync,
        E: DeserializeOwned + Send + Sync,
    {
        let result = self.inner.send_xrpc(request).await;
        // handle session-refreshes as needed
        if Self::is_expired(&result) {
            self.refresh_session().await;
            self.inner.send_xrpc(request).await
        } else {
            result
        }
    }
}

self.innersend_xrpc() を移譲し、その結果を判定して Expired のエラーであった場合のみ self.refresh_session() を呼び出してtokenを更新し、再度同じ send_xrpc() を呼び出す、という形。

self.refresh_session() が成功して内部のsessionが書き変わっていれば、再度同じリクエストを送ったときにはtokenが更新されているので成功する。

…と思ったが、実際にはこれは思った通りには動かない。 Rustのtrait実装はインスタンスのメソッドをオーバーライドしているわけではなく、あくまでtraitのメソッドを実装しているだけなので、 self.inner に移譲したメソッドは self とは別のインスタンスとして扱われる。

つまり self.inner.send_xrpc() のデフォルト実装中で呼ばれる self.auth() はあくまで self.inner に実装されている auth() であり、Wrapper に実装されている auth() は呼ばれない。なので、いくら Wrapper の内部でsessionを更新しても self.inner には関係がない、ということになる。

2重のwrapperで解決

これをどうやって解決するかしばらく悩んだが、結局wrapperをもう一つ作って2重にsessionを共有することで想定した動きをするようになった。

主な処理を移譲された inner が同じsession情報を持って使ってくれていれば問題ないので、元のwrapperを RefreshWrapper として、同じように XrpcClient を実装する SessionWrapper を作り、 auth()self.session を参照する機能だけをそちらに持たせるようにする。

struct RefreshWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    inner: T,
    session: Arc<RwLock<Option<Session>>>,
}

#[async_trait]
impl<T> HttpClient for RefreshWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    ... // (use inner)
}

#[async_trait]
impl<T> XrpcClient for RefreshWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E>
    where
        P: Serialize + Send + Sync,
        I: Serialize + Send + Sync,
        O: DeserializeOwned + Send + Sync,
        E: DeserializeOwned + Send + Sync,
    {
        let result = self.inner.send_xrpc(request).await;
        // handle session-refreshes as needed
        if Self::is_expired(&result) {
            self.refresh_session().await;
            self.inner.send_xrpc(request).await
        } else {
            result
        }
    }
}

struct SessionWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    inner: T,
    session: Arc<RwLock<Option<Session>>>,
}

#[async_trait]
impl<T> HttpClient for SessionWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    ... // (use inner)
}

#[async_trait]
impl<T> XrpcClient for SessionWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    ...

    async fn auth(&self, is_refresh: bool) -> Option<String> {
        self.session.read().await.as_ref().map(|session| {
            if is_refresh {
                session.refresh_jwt.clone()
            } else {
                session.access_jwt.clone()
            }
        })
    }
}

そして、 AtpAgent では XrpcClient を実装したものとして RefreshWrapper<SessionWrapper<T>> を使うようにする。両者間で session を共有するために Arc<RwLock<Option<Session>>> を渡している。

struct AtpAgent<T>
where
    T: XrpcClient + Send + Sync,
{
    api: Service<RefreshWrapper<SessionWrapper<T>>>,
    session: Arc<RwLock<Option<Session>>>,
}

impl<T> AtpAgent<T>
where
    T: XrpcClient + Send + Sync,
{
    fn new(xrpc: T) -> Self {
        let session = Arc::new(RwLock::new(None));
        let api = Service::new(Arc::new(RefreshWrapper {
            inner: SessionWrapper {
                inner: xrpc,
                session: Arc::clone(&session),
            },
            session: Arc::clone(&session),
        }));
        Self { api, session }
    }
}

send_xrpc() 内でのexpire検出と更新・再送の処理は RefreshWrapper で行われるが、実際の送信は内部の SessionWrapper に移譲されることになる。 SessionWrapper では共有されている self.sessionauth() 内で参照する動作をするので、 RefreshWrapper (や、それを使う AtpAgent 本体) で更新されたsession情報がそのまま使われることになる。

これでtokenの自動更新が実現された。

並行処理での同時更新の問題

もう一つ、TypeScript実装の AtpAgent が持っている機能として、複数の refreshSession() が同時に来ても1回しか実行されないように制御する、というものがある。

  /**
   * Internal helper to refresh sessions
   * - Wraps the actual implementation in a promise-guard to ensure only
   *   one refresh is attempted at a time.
   */
  private async _refreshSession() {
    if (this._refreshSessionPromise) {
      return this._refreshSessionPromise
    }
    this._refreshSessionPromise = this._refreshSessionInner()
    try {
      await this._refreshSessionPromise
    } finally {
      this._refreshSessionPromise = undefined
    }
  }

内部状態として Promise<void> | undefined を持っており、 _refreshSession() が呼ばれた時点でそれが undefined であれば Promise<void> をセットした上で実際にその処理を await し、既に Promise<void> がセットされている場合はそれを返す、という動き。

もしAgentが並行して複数のAPIをほぼ同時に叩いたときに、tokenがexpiredだった場合はその結果がほぼ同時に返ってくることになる。その時点ではtokenがrefreshされていないので、それぞれがほぼ同時に自動refresh処理を行うことになるが、実際には1回だけrefreshされていれば良いので、そのように制御するための仕組み、と考えられる。

そもそも同じrefresh tokenを使って複数回refreshのリクエストすると、既に使用されたrefresh tokenが無効になり2回目以降のrefreshはエラーになる可能性もあり(ここはサーバ実装次第と思われる、現時点でのblueskyでは特に何も起こらず新しいtokenが発行されるだけ、のようだ)、非同期で並行処理され得る環境ではこういった制御は必要になる。

…しかしRustでは同じように実装しようとしても上手くいかなかった。 async なinnerを呼んだ返り値を保持するとなると Pin<Box<impl Future<Output = Result<...>> + 'static>> のような形になり、しかしこれを Mutex とかで保持しようとしてもこれは Send ではないので Mutex に入れられない、など… 何度もコンパイルエラーに悩まされて挫折した。 Rustの非同期に詳しい人なら上手く実装できるんだろうか…

Notify による制御実装

ChatGPTと長々と議論し、結局 tokio::sync にある MutexNotify を使うことで同様の動作を実現させた。

use tokio::sync::{Mutex, Notify};

struct RefreshWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    inner: T,
    session: Arc<RwLock<Option<Session>>>,
    is_refreshing: Mutex<bool>,
    notify: Notify,
}

impl<T> RefreshWrapper<T>
where
    T: XrpcClient + Send + Sync,
{
    ...

    async fn refresh_session(&self) {
        {
            let mut is_refreshing = self.is_refreshing.lock().await;
            if *is_refreshing {
                drop(is_refreshing);
                return self.notify.notified().await;
            }
            *is_refreshing = true;
        }
        self.refresh_session_inner().await;
        *self.is_refreshing.lock().await = false;
        self.notify.notify_waiters();
    }
}

まず self.is_refreshing で、refresh実行中のものがあるか否かを保持する。これは Mutex で管理されるので、 最初にロックを取得して true に変更したものが完了してまた false に戻すまでは他の処理からの読み取り結果は必ず true になる。 そして、その true に変更できたものだけが、後続の実際の更新処理である self.refresh_session_inner() を実行する。

self.is_refreshingtrue になっている間は、 self.refresh_session_inner() が終わったあとに呼ばれる self.notify.notify_waiters() によって完了を知らされるまで待機するだけ、という形になる。 こういった他のスレッドからの通知を待つための仕組みとして tokio::sync::Notify があるようだ。この場合は完了したことを知りたいだけなので Notify だが、処理結果も知りたい場合は oneshot などを使うと良いのかもしれない。

ライブラリとしては特定の非同期ランタイムに依存するようにはしたくないので tokio などを使うのは避けたかったが、標準ライブラリや futures などには同様の仕組みがないようだったので仕方なく tokio を使うことにした。実際のところ sync featureで使うこれらのものは特にランタイム依存は無いようで、 async-std など別の非同期ランタイムで実行しても問題なく動作するようではあった。

まとめ

ライブラリの依存は増えてしまったが、どうにか AtpAgent として実装したい機能は実現できた。Rustむずかしい…。

他にもっと良い方法をご存知の方がいればpull-requestなどいただけると嬉しいです。