超簡単にオモチャ LWT を実装してみた

OCaml は現時点でマルチコア対応じゃないので、マルチスレッドにしてもマルチコアの恩恵を享受することができません。ですので、OCaml で thread を使う旨みというのは、関数を並行に走らせる事ができるってことだけです。でも並行に走らせる事が出来ればいろいろ便利ですよね。

でもマルチスレッドプログラミングで一番困るのが排他制御。これを上手くやってやらないと、OCaml で type safe なはずなのに、 segfault という事になります。もちろん OCaml プログラマは普段から type safety に慣れきっているので、 type system で保障されない排他制御を完璧に書けるはずがないのです。心が折れます。

そこで、light weight thread。こいつは実は thread じゃありません。要は同一スレッド内でタスク関数をとっかえひっかえ asynchronous に走らせると言うだけで、ホントは並行じゃないのですが、とっかえひっかえする事で、thread を使わないのに thread っぽい事ができます。Thread じゃないので排他制御いりません。入出力など、時間がかかる所を上手く一箇所で管理して select してやるスケジューラを書けばできちゃいます。もちろん本当の thread じゃないので、各タスクはスケジューラに協調してやる必要があり、あまり時間のかかる仕事をやってスケジューラに制御を戻すのを忘れているようなタスクを書いてはいけません。

目標は、できるだけ実装を簡単にしてどう動くかを理解する。だけ。タスク選択の faireness とか、全く考えずに、とにかく仕組みを理解するためだけのコードを短く書きます。

基本のデータ型から

type time = float (** in seconds *)

module Thunk = struct
  (** Jobs are stored as Thunk.t *)
    
  type t = unit -> unit

  let run t = 
    (* prerr_endline "run"; *)
    t ()

end

Thunk.t は遅延されているタスクです。条件が満たされると実行されるわけですね。

遅延タスクの種類

次の三種類を考えます

  • Ready: いつでも実行可能なタスク
  • Timed: ある時間が来たら実行可能なタスク
  • Fd: あるファイルハンドルが読み/書き可能になったら実行可能なタスク

この三つについて管理モジュールを書きましょう。

いつでも実行可能なタスクのキュー

実行可能タスクキューを Ready というモジュールで定義しています。Queue のインスタンスを作っているだけです。

module Ready : sig
  type t (** ready thunk store : we can execute them at anytime *)

  val create : unit -> t
    
  val add : t -> Thunk.t -> unit

  val take : t -> Thunk.t
    (** Take one of thunks from [t]. If none, raise Not_found *) 

end = struct

  type t = Thunk.t Queue.t

  let create = Queue.create
  let add t thunk = Queue.add thunk t
  let take t = try Queue.take t with Queue.Empty -> raise Not_found
end

タイマー待ちタスクのヒープ

タイマー待ちのタスク集合です。本当は予定時間でソートされたヒープを使いたいのですが、、、OCaml の stdlib にはヒープが無い!! なんてこった。ヒープを実装するのは面倒なので、Set でやっています。とにかく簡単に、簡単に。

module Timed : sig
  type t (** timed thunk store *)

  val create : unit -> t

  val add : t -> time -> Thunk.t -> unit
    (** register a thunk which should be executed after time *)
    (** 時間指定したタスクを足す *)

  val get_expired : t -> time -> (time * Thunk.t) list
    (** get the thunks whose timers have expired *)
    (* 時間が過ぎた実行すべきタスクを取り出す *)

  val next_expiration : t -> time option
    (** the next earliest timer expiration time / or none *)
    (* 次のタイマーイベント *)

end = struct

  (* Timed schedules. It should be implemented using Heap, but
     heap is not available in the OCaml standard library. 
     What the h#@$! Instead we use Set here. *)

  module Set = 
    Set.Make(struct 
      type t = time * Thunk.t
      let compare ((at1 : time), _) ((at2 : time), _) = compare at1 at2
    end)

  type t = Set.t ref

  let create () = ref Set.empty

  let add t at v = t := Set.add (at, v) !t

  let get_expired t now = 
    let expired, not_yet = Set.partition (fun (at, _) -> at <= now) !t in
    t := not_yet;
    Set.elements expired

  let next_expiration t = try Some (fst (Set.min_elt !t)) with Not_found -> None
end

ファイルハンドルで待っているタスクのテーブル

各ファイルハンドル、読み、書きに対応して、待っているタスクのテーブルです。(読み書き、ハンドル) をキーにした Hashtbl で実装しています。キーに関連付けられたデータは Thunk.t list です。(Hashtbl.add を繰り返して multi-binding にしても良いのですが、判りにくいと思ったので、そうはしていません。)

さて、ここでも Thunk.t list でいいんかいな、という問題が出てきますが、気にしない。

module Fd : sig
  type t (** fd polling thunk store *)

  val create : unit -> t

  val add : t -> [ `Read | `Write ] -> Unix.file_descr -> Thunk.t -> unit
    
  val wait_fds : t -> Unix.file_descr list * Unix.file_descr list
    (** list up file descriptors for read and write to select *)

  val take : t -> ([ `Read | `Write ] * Unix.file_descr) -> Thunk.t
    (** [take t (rw, fd)] takes one of the thunks registered 
	  for given (rw, fd) from [t] and return it *)

end = struct
  module Hashtbl_list = struct
    (** Hashtbl with list *)
  
    open Hashtbl
    type ('a, 'b) t = ('a, 'b list) Hashtbl.t
  
    let create = create
  
    let find t k = try Hashtbl.find t k with Not_found -> []
  
    let add t k v = Hashtbl.replace t k (v :: find t k)
  
    let to_alist tbl =
      (* 空は除外しないと thunk の無い fd で待ってしまう *) 
      List.filter (function (_, []) -> false | _ -> true)
        (Hashtbl.fold (fun k v acc -> (k,v)::acc) tbl [])
  
    let replace t k = function
      | [] -> Hashtbl.remove t k
      | vs -> Hashtbl.replace t k vs
  end

  type t = ([ `Read | `Write ] * Unix.file_descr, Thunk.t) Hashtbl_list.t

  let create () = Hashtbl_list.create 101

  (** [add] adds the thunk at the head *) 
  (* [(rw, fd)] に [thunk] を登録 *) 
  let add t rw fd thunk = Hashtbl_list.add t (rw,fd) thunk

  let to_alist = Hashtbl_list.to_alist

  (* [thunk] が登録された key を列挙 *)
  let wait_fds t : Unix.file_descr list * Unix.file_descr list = 
    List.fold_left (fun (fds_read, fds_write) ((rw,fd),_) -> 
      match rw with
      | `Read -> fd::fds_read, fds_write
      | `Write -> fds_read, fd::fds_write)
      ([],[]) (to_alist t)

  (** [take] simply gets the head. 
	For simplicity, [add] and [take] think no faireness is considered. *)  
  (* key に対応する thunk を一つ取り出す。取り出し方がダサいのでスケジューリングが不公平だが気にしない。 *)
  let take t rw_fd =
    match Hashtbl_list.find t rw_fd with
    | [] -> assert false (* スケジューラがちゃんとしていれば空のはずは無い。ここに来たらバグ *)
    | x::xs -> Hashtbl_list.replace t rw_fd xs; x
end

スケジューラ

上の三つのタスク管理を合わせて一つにし、肝心の select を行う所です。

module Scheduler_gen : sig
    
  type t (* スケジューラの型 *)
  
  val create : unit -> t
    (** Create a scheduler *)

  val run : t -> unit
    (* スケジューラに溜まったタスクを実行する *)
    (** Run the schedules stored in [t] *)

  (** scheduling functions *)
    
  val whenever : t -> Thunk.t -> unit
    (** Run the thunk whenever it is possible *)
    (* Ready キューに足す *)

  val timed : t -> time -> Thunk.t -> unit
    (** Run the thunk after the [time] comes *)
    (* Timed ヒープに足す *)

  val fd : t -> [`Read | `Write] -> Unix.file_descr -> Thunk.t -> unit
    (** [fd t rw fd thunk] runs [thunk] when the file descriptor [fd]
       is ready for reading/writing([rw]) *)
    (* fd テーブルに足す *)

end = struct
  
  (** the scheduler *)    
  type t = {
    ready : Ready.t; (** thunks ready to run *) 
    timed : Timed.t; (** timed thunks *)
    fd : Fd.t; (** fd polling thunks *)
  }

  (** [ready_timed t] take all the timed thunks whose timers are
      expired already and add them to the ready queue. 
      Then it returns the next earliest timer expiration time.
  *)
  (* 現時点で実行すべき timed タスクを Ready キューに足す。
     Unix.time は一秒刻みの精度でダサいが気にしない。
     次回 timer の expiration までの時間を返す *)
  let ready_timed t =
    (** We use [Unix.time], which serves only second granularity.
	It is sad, but it is simple. *)
    let now = Unix.time () in
    let t_jobs = Timed.get_expired t.timed now in
    List.iter (fun (_,thunk) -> Ready.add t.ready thunk) 
      (List.rev t_jobs);
    match Timed.next_expiration t.timed with 
    | Some t -> Some (t -. now), t_jobs <> []
    | None -> None, t_jobs <> []
  ;;
  
  (** [ready_fd t next_time_out] polls over the fds where fd thunks
      are waiting and if one of them is ready, take it and add to
      the ready queue. The polling times out in [next_time_out] secs
      (if None, no time out). After the polling, the expired timed
      thunks are also added to the ready queue. 

      If there is something added, the function returns [true].
      Otherwise, if nothing to wait, returns [false].
  *)
  (* poll すべきファイルハンドるを調べて、それらが準備できるまで待つ。
     next_time_out で時間切れを設定できる。

     準備ができた、もしくはタイムアウトした場合は、Fd, Timed それぞれで
     実行可能なタスクを Ready キューに足して。[true] を返す。

     全く待つ必要が無い場合は [false] を返す。
  *)
  let ready_fd t next_time_out =
    let fds_read, fds_write = Fd.wait_fds t.fd in
    
    match fds_read, fds_write, next_time_out with
    | [], [], None -> false (* End of all the jobs *)
    | _ ->
        let next_time_out = 
          match next_time_out with
          | Some t -> t
          | None -> -1.0 (* forever *)
        in
        let fds_ready =
          let fds_readable, fds_writable, _ = 
            Unix.select fds_read fds_write [] next_time_out
          in
          List.map (fun x -> `Read, x) fds_readable 
          @ List.map (fun x -> `Write, x) fds_writable
        in
        let _, _ = ready_timed t in
        match fds_ready with
        | [] -> true (* timer *)
        | fds_ready::_ ->
            (* Fd タスクは一つしか取り出さない。複数取り出すと干渉する *) 
            Ready.add t.ready (Fd.take t.fd fds_ready); true
  ;;

  (** thunk registeration functions *)
  let whenever t = Ready.add t.ready
  let timed t = Timed.add t.timed
  let fd t = Fd.add t.fd

  (** scheduler algorithm:
      - run the ready thunks 
      - if no ready thunk available:
        - check timed thunks and if anything expires, add it 
          to the ready queue then run
        - if there is no timer expired thunks:
          - wait on fds and the next timer event 
            to add thunks to the ready queue
          - if nothing to wait: we are done
  *)
  let rec run t =
    try 
      Thunk.run (Ready.take t.ready); 
      run t
    with
    | Not_found ->
        let next_time_out, added = ready_timed t in
        
        if added then run t
        else if ready_fd t next_time_out then run t
        else () (* end *)
  ;;

  let create () = { ready = Ready.create ();
                    timed = Timed.create ();
                    fd = Fd.create (); }
end

まあ、要するに centralize された select があって、その周りの管理テーブルがあって、そんな、感じ。GUI のツールキットのイベントハンドラのスケジューラの内部とかもこんなんですよ。C とかで書くから泣きそうになるけど、これは ML だからカンタンですねー。

世界は一つ

Scheduler_gen モジュールではスケジューラ渡しの形で実装を行いましたが、実の所スケジューラが複数あってもらっても困ります。なので、一つに制限して、スケジューラ渡しをやめたモジュール Schedule を作ります:

(** We can only have one scheduler, [the_scheduler] *)
module Scheduler : sig
  val run : unit -> unit
  val whenever : Thunk.t -> unit
  val after : float (* secs *) -> Thunk.t -> unit
    (** [after sec thunk] executes [thunk] at least [sec] after *)
  val read : Unix.file_descr -> Thunk.t -> unit
  val write : Unix.file_descr -> Thunk.t -> unit
end = struct
  let the_scheduler = Scheduler_gen.create ()

  (** functions are wrapped so that they use [the_scheduler] *)

  let run () = Scheduler_gen.run the_scheduler
  let whenever = Scheduler_gen.whenever the_scheduler
  let timed = Scheduler_gen.timed the_scheduler
  let fd = Scheduler_gen.fd the_scheduler

  (** We use [Unix.time], which serves only second granularity.
      It is sad, but it is simple. *)
  let after sec = timed (Unix.time () +. sec)

  let read = fd `Read
  let write = fd `Write
end

timed から after、 fd から read, write を派生させてますがカンタンですね。

Monadic にしよう!!

さあ、これでスケジューラができましたから、これを使えば LWT プログラムが書けるわけです。でも、面倒くさい。タスクが入出力などで一時中断するときは必ず Schedule.read とか書かないといけません。その上それを書くのを忘れるとスケジュールされなくなって、全く asynchronous じゃなくなります。危険です。なので、Schedule 関数を隠蔽して、もっと自然に LWT が書けるようにしましょう。ここで、モナド風の味付けが使えます!! モナドって Haskell だけじゃなくて OCaml でも便利なんですよ!! やっぱ関数型だなあ、と実感できます。

'a Monad.t (以下 'a t) は 「'a という型の何時か計算する」タスクです。たとえば、

  • sleep 3 : unit t 三秒待ってから () を返すタスク
  • read fd str pos len : int t fd からデータを読み込んだら、読み込んだバイト数を返すタスク

とかが書けます。さっきまでの Thunk.t というタスクの型は unit -> unit、つまり、「何かやるだけ」という型でした。'a t というタスクモナドの方が表現力がありますね。

この 'a t を Monad.bind (二項演算子 >>=) で組み合わせて「何時か何かを計算する」タスクをどんどん作っていきます。たとえば、

  • sleep 3 >>= fun () -> read fd str pos len : int t 三秒後に fd からデータを読み込んでみて、読み込んだら、読み込みバイト数を返すタスク

とかが記述できます。こうやって組み合わせていって、最後にできた大きなタスクを Monad.run に食わせると asynchronous に計算が行われる。という訳です。

このタスクモナド 'a t を元のタスク Thunk.t = unit -> unit で実装してやるのですが、ちょっと考えないとできないかもしれません。どやって思いつくのかは省いて答えだけ書くと、continuation (継続) の概念を使います。'a t で何か 'a の値を計算してやったら、その結果を使って普通は何かをやりたい訳ですが、その仕事を継続 k としましょう。k は 'a を受け取って何かしらお仕事をして他の色々なタスクも全部処理して終了!!もうこれ以上やること無いよ!!という計算なので、結果は unit です。k の型は 'a -> unit になります。タスクモナド 'a t はこの継続 k を受け取って、自分のタスク計算で得られた 'a の値を放り込んでやればよろしい。 なので、 'a t の型の実装は ('a -> unit) -> unit になります。まーよくわからんでも次の実装をじっと眺めると動くってわかりますよ。

ぶっちゃけて言うなら、 unit -> unit である Thunk.t を途中でバンっと叩き切ったら、上流が 'a t。断面からドロッと出てきたのが 'a。'a を受け取って Thunk.t の最後までが継続。

(** The monadic API *)

module Monad : sig

  type 'a t

  val return : 'a -> 'a t
    (* 既にある値をそのままタスクにします。スケジュール計算は行われません。*)

  val bind : 'a t -> ('a -> 'b t) -> 'b t
    (* あるタスクモナドから別のタスクモナドを作り出します。これでどんどん大きなタスクを構築していく *) 

  val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
    (** same as bind *)

  val run : unit t -> unit
     (* 最終的に出来たタスクモナドを実行 *)

  val whenever : unit t -> unit
    (** run the thunk at any time. the result is discarded *)
    (* 並行してタスクを実行。結果は捨てられます *)

  val sleep : float -> unit t
     (* 寝て、unit を返すタスク *)

  val read : Unix.file_descr -> string -> int -> int -> int t
  val write : Unix.file_descr -> string -> int -> int -> int t
     (* fd の読み書きが出来るのを待ってから、 read/write を行い、
        終了後結果を返すタスク *)

end = struct

  type 'a t = ('a -> unit) -> unit
    (** ['a t] receives the continuation function ('a -> t)
	and once ['a t]'s computation of ['a] is done, pass it
	to the continuation *)

  let return (v : 'a) : 'a t = fun k -> k v
     (* v をそのまま継続に渡すだけ *)

  let bind (t : 'a t) (f : 'a -> 'b t) : 'b t = fun k ->
    (* k は 'b t のための継続なので、これから 'a t のための継続を作ってやる。(fun () -> ... k) すると Thunk.t になるので、これをちゃんと Schedule してやる *)
    let k' a = Scheduler.whenever (fun () -> f a k) in
    Scheduler.whenever (fun () -> t k')

  let (>>=) = bind
    
  let whenever (t : unit t) = Scheduler.whenever (fun () -> t ignore)
    (* 結果は無視してよいので ignore *)

  let sleep sec : unit t = fun k -> Scheduler.after sec k

  let read fd buf off len = fun k ->
    Scheduler.read fd (fun () -> k (Unix.read fd buf off len))

  let write fd buf off len = fun k ->
    Scheduler.write fd (fun () -> k (Unix.write fd buf off len))

  let run (t : unit t) = whenever t; Scheduler.run ()
end

まーちょっと最後は飛ばしてますけど、眠いんで。

出来た

というわけで結構短い行数で書けました。次の例は、

  • 三秒に一度 ! とプリントするループ
  • 標準入力を標準出力に書き出すループ

を二つ asynchronous に動かすプログラムです。
Monad 記法が面倒くさいかもしれませんが、慣れれば結構簡単に async プログラムが書けることがわかりますよ。

open Monad

let _ = 
  let rec loop f = f () >>= fun () -> loop f in
  
  let buf = String.create 256 in
  let rec cat () =
    read Unix.stdin buf 0 256 >>= fun read ->
      let rec writer pos =
	write Unix.stdout buf pos (read - pos) >>= fun wrote ->
	  let pos = pos + wrote in
	  if pos = read then return ()
	  else writer pos
      in
      writer 0
  in
  whenever (loop cat);
  whenever (loop (fun () -> 
    sleep 3.0 >>= fun () -> 
      prerr_endline "!"; return ()));
  run (return ())
;;

これから

もちろん、これはとにかく短くて動くものを作りたかったので作りこみが甘いです。
たとえば、read/write/sleep 以外でも時間がかかるシステムコールは沢山あります。file のオープンだとか、 gethostbyname だとか。これをちゃんと asynchronous にやるには select は使えないので、局地的に本物の thread を使ってあげる必要がありますね。後はタスク管理とか、タスクの選び方とか、、、適当です。まー、どうやって動いているのか、わかれば、いいのです。
ちゃんとしたものを使いたければ Lwt (http://ocsigen.org/lwt/) とかを使ってください。私は使った事ないけど。多分ちゃんとしてるでしょ。

さて、次は超カンタンな FRP かな?いやーこれはカンタンにはいかないな…