Effective ATS:
生産者/消費者問題

生産者/消費者問題は並行プログラミングにおける古典的な問題です。 この記事では、依存型と線形型を効果的に使った、この問題に対する実装を紹介しようと思います。 並行プログラムをデバッグすることがとてつもなく困難なため、バグの検出と修正を静的な検査に頼ることは、逐次実行プログラミングよりも並行プログラミングにおいてさらに重要になります。 ここで紹介するプログラミングのスタイルは習得するのに時間がかかるかもしれませんが、その大きな利点はすぐに評価できるでしょう。 一般的に、動的なテストが難しい文脈において、ATS はより異彩を放っています。

この問題についての解説

有限容量のバッファが与えられたとき、そのバッファに要素を挿入する複数の生産者と、そのバッファから要素を取り出す複数の消費者がいます。 もし生産者が要素を挿入しようとしたけれどバッファが満杯であったなら、バッファが満杯でなくなるまでその生産者はブロックします。 もし消費者が要素を取り出そうとしたけれどバッファが空であったなら、バッファが空でなくなるまでその消費者はブロックします。

線形バッファのインターフェイス

はじめに次のようにバッファを表わす線形抽象型を宣言しましょう:
//
absvtype
buffer_vtype (a:vt@ype+, m:int, n: int) = ptr
//
vtypedef
buffer (a:vt0p) = [m,n:int] buffer_vtype (a, m, n)
vtypedef
buffer (a:vt0p, m:int, n:int) = buffer_vtype (a, m, n)
//
型 T と2つの整数 M, N が与えられたとき、型 [buffer(T, M, N)] は型 T の要素が N 個保存された容量 M のバッファを表わします。 次の補題 (lemma) は (制約解決を目的として) M >= N かつ N >= 0 という特性を成立させるために使うことができます:
praxi
lemma_buffer_param{a:vt0p}
  {m,n:int}(!buffer (INV(a), m, n)): [m >= n; n >= 0] void
関数 [buffer_make_nil] は要素を含まない与えられた容量の線形バッファを生成することができます:
fun{a:vt0p}
buffer_make_nil{m:pos} (cap: int m): buffer (a, m, 0)
関数 [buffer_isnil] と [buffer_isful] は与えられたバッファがそれぞれ空なのか、満杯なのかを判定できます:
fun buffer_isnil{a:vt0p}
  {m,n:int} (!buffer (INV(a), m, n)): bool (n==0)
fun buffer_isful{a:vt0p}
  {m,n:int} (!buffer (INV(a), m, n)): bool (m==n)
関数 [buffer_insert] は満杯でないバッファに要素を挿入します:
fun{a:vt0p}
buffer_insert{m,n:int | n < m}
(
  !buffer (INV(a), m, n) >> buffer (a, m, n+1), x: a
) : void // end of [buffer_insert]
関数 [buffer_takeout] は空でないバッファから要素を取り出します:
fun{a:vt0p}
buffer_takeout{m,n:int | n > 0}
  (buf: !buffer (INV(a), m, n) >> buffer (a, m, n-1)): (a)

共有バッファのインターフェイス

実装の観点から、共有バッファは保護メカニズムで線形バッファを包みます。 このメカニズムは1つの mutex と2つの条件変数から成り立っています。 この mutex は線形バッファを保護するために、この条件変数はビジーウェイトを回避するために導入されています。 この保護メカニズムについて後でより詳細に説明します。

ここで、共有バッファを表わす抽象型を導入してみましょう:

//
abstype
sbuffer_type (a:vt@ype) = ptr
//
typedef sbuffer (a:vt0p) = sbuffer_type (a)
//
[sbuffer] は非線形の型であることに注意してください。 これは、共有バッファは生成された後では明示的に解放することができないことを意味しています。 共有バッファを解放できるように、参照カウントの線形型で [sbuffer] を作ることも可能です。 興味のある読者にはこの別設計に挑戦してみることをおすすめします。 これはわずかに複雑な実装になるはずです。

線形バッファが与えられたとき、[sbuffer_make_buffer] は (保護メカニズムで包むことで) それを共有バッファに変えます:

fun{a:vt0p}
sbuffer_make_buffer (buffer(a)): sbuffer (a)
関数 [sbuffer_insert] は共有バッファへ要素を挿入にします:
fun{a:vt0p}
sbuffer_insert (sbuffer(a), x: a): void // called by producer
[sbuffer_insert] 呼び出しは、共有バッファ中の線形バッファが満杯であるためにブロックする可能性があることに注意してください。

関数 [sbuffer_takeout] は共有バッファから要素を取り出します:

fun{a:vt0p}
sbuffer_takeout (sbuf: sbuffer(a)): (a) // called by consumer
[sbuffer_takeout] 呼び出しは、共有バッファ中の線形バッファが空であるためにブロックする可能性があることに注意してください。

共有バッファの実装

共有バッファ中の線形バッファは mutex によって保護されています。

関数 [sbuffer_acquire] は与えられた共有バッファ中の線形バッファを獲得します:

fun sbuffer_acquire{a:vt0p} (sbuf: sbuffer(a)): buffer (a)
関数 [sbuffer_release] は (線形バッファを獲得したときと同じ共有バッファに) 線形バッファを手放します:
fun
sbuffer_release{a:vt0p} (sbuf: sbuffer(a), buf: buffer(a)): void
ここで、次のような2つの関数を宣言しましょう:
fun{a:vt0p}
sbuffer_insert2 (sbuffer(a), !buffer(INV(a)) >> _, x: a): void
fun{a:vt0p}
sbuffer_takeout2 (sbf: sbuffer(a), buf: !buffer (INV(a)) >> _): (a)
関数 [sbuffer_insert2] を呼び出すと、与えられた線形バッファに要素を挿入しようと試みます。 線形バッファが満杯であった場合、線形バッファが満杯でなくなるまで条件変数を待ち合わせるために呼び出し元はブロックします。

関数 [sbuffer_takeout2] を呼び出すと、与えられた線形バッファから要素を取り出そうと試みます。 線形バッファが空であった場合、線形バッファが空でなくなるまで条件変数を待ち合わせるために呼び出し元はブロックします。

[sbuffer_insert] と [sbuffer_takeout] はそれぞれ [sbuffer_insert2] と [sbuffer_takeout2] を用いて素直に実装できます:

implement{a}
sbuffer_insert (sbuf, x) =
{
  val buf = sbuffer_acquire (sbuf)
  val ((*void*)) = sbuffer_insert2 (sbuf, buf, x)
  val ((*void*)) = sbuffer_release (sbuf, buf)
}

implement{a}
sbuffer_takeout (sbuf) = x where
{
  val buf = sbuffer_acquire (sbuf)
  val x(*a*) = sbuffer_takeout2 (sbuf, buf)
  val ((*void*)) = sbuffer_release (sbuf, buf)
}
共有バッファの中には2つの条件変数があります。 それらの1つは (共有バッファ中の) 線形バッファが空である条件を扱うためのものです。 もう1つは線形バッファが満杯である条件を扱うためのものです。 次の関数群はこれら2つの条件変数を直接用いて実装されています:
//
fun
sbuffer_wait_isnil
  {a:vt0p}{m:int}
  (sbuffer(a), !buffer(a, m, 0) >> buffer(a)): void
fun
sbuffer_signal_isnil{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void
//
fun
sbuffer_wait_isful
  {a:vt0p}{m:int}
  (sbuffer(a), !buffer(a, m, m) >> buffer(a)): void
fun
sbuffer_signal_isful{a:vt0p}{m,n:int} (sbuf: sbuffer(a)): void
//
関数 [sbuffer_wait_isnil] に割り当てられた型は、呼び出す時に呼び出し元が線形バッファの所有者でなければならないこと、呼び出しが返る時にやはりその呼び出し元が空の可能性がある線形バッファの所有者でなければならないこと、を意味しています。 [sbuffer_wait_isnil] 呼び出しが返る時にも線形バッファを所有すべき理由は、 他の呼び出し元 (消費者) がより早くその線形バッファへのアクセス権を獲得してしまう可能性があるためです。

関数 [sbuffer_wait_isful] に割り当てられた型も同様に説明できます。

[sbuffer_insert2] を実装する

[sbuffer_insert2] の実装は次のようになります:
implement{a}
sbuffer_insert2
  (sbuf, buf, x) = let
//
val isful = buffer_isful (buf)
//
prval () = lemma_buffer_param (buf)
//
in
//
if isful
  then let
    val () =
      sbuffer_wait_isful (sbuf, buf)
    // end of [val]
  in
    sbuffer_insert2 (sbuf, buf, x)
  end // end of [then]
  else let
    val isnil = buffer_isnil (buf)
    val ((*void*)) = buffer_insert (buf, x)
    val ((*void*)) = if isnil then sbuffer_signal_isnil (sbuf)
  in
    // nothing
  end // end of [else]
//  
end // end of [sbuffer_insert2]
このコードは自明でしょう。 空のバッファに要素を挿入するには、バッファが空である条件を扱う条件変数にシグナルを送る必要があることに注意してください。 もしこれを行なわないと、決して起こされない条件変数をを待ち合わせる消費者によってデッドロックが発生する可能性があります。

[sbuffer_takeout2] を実装する

implement{a}
sbuffer_takeout2
  (sbuf, buf) = let
//
val isnil = buffer_isnil (buf)
prval () = lemma_buffer_param (buf)
//
in
//
if isnil
  then let
    val () =
      sbuffer_wait_isnil (sbuf, buf)
    // end of [val]
  in
    sbuffer_takeout2 (sbuf, buf)
  end // end of [then]
  else x where
  {
    val isful = buffer_isful (buf)
    val x(*a*) = buffer_takeout (buf)
    val ((*void*)) = if isful then sbuffer_signal_isful (sbuf)
  } (* end of [else] *)
//  
end // end of [sbuffer_takeout2]
このコードも自明でしょう。 満杯のバッファから要素を取り出すには、バッファが満杯である条件を扱う条件変数にシグナルを送る必要があることに注意してください。 もしこれを行なわないと、決して起こされない条件変数をを待ち合わせる生産者によってデッドロックが発生する可能性があります。

残りの実装について

生産者/消費者問題の残りの実装は (pthread がサポートしている mutex と条件変数を用いて) C言語によってすぐに作ることができます。 例えば、[sbuffer] は次の構造体型の値へのポインタとして表現できます:
struct
{
  void *buffer ;
  mutex_t mutex ;
  cond_t CVisnil ;
  cond_t CVisful ;
}
直接C言語でコーディングする代わりに、私の実装の残りを ATS で書くこともできますが、安全でないプログラミングの機能を全面的に使うことになります。 このコードの全体はファイル sbuffer.sats と sbuffer.dats から入手できます。 実装の正確さに自信があるので、私の実装にはテストコードがありません。 ATS でプログラミングをする主要な理由は、意図通りにコードが実行されるのか自分を納得させるために ATS の型を効果的に使えることです。 今回の場合はそれができたと感じています。

共有リソースを共なう安全なプログラミング

この記事の残りでは、上記で説明した生産者/消費者問題を一般化したような、共有リソースを共なう安全なプログラミングに対する一般的なアプローチの要点を説明しようと思います。

共有しようとしている値のために線形型 [Resource] を想像してみましょう:

absvtype Resource
なんらかの保護メカニズムで型 [Resource] の線形値を包んで作られる値のために (非線形な) 型 [SharedResource] を導入してみましょう:
abstype SharedResource
その包む行為をする関数は次のような型になるでしょう:
fun SharedResource_create (R: Resource): SharedResource
共有リソースの中にある線形なリソースはロックによって保護されています。 そして次の2つは線形リソースを取り出す関数と、それを戻す関数です:
fun SharedResource_acquire (SR: SharedResource): Resource
fun SharedResource_release (SR: SharedResource, R: Resource): void
ここで、線形リソースを処理する次の関数があると仮定してみましょう:
fun Resource_process (R: !Resource >> _): bool
[Resource_process] 呼び出しが true を返したら、そのリソースは適切に処理されています。 そうでなければ、処理する前にそのリソースの状態を変更する必要があることを示していることになります。

私達が実装したいのは次の関数です:

fun SharedResource_process (SR: SharedResource): void
共有リソースに対して呼び出されると、[SharedResource_process] は [Resource_process] を呼び出して、その内にある線形リソースを処理しようと試みます。

[SharedResource_process] の実装は次のような概略になります:

//
extern
fun
SharedResource_wait
  (SR: SharedResource, R: !Resource >> _): void
extern
fun
SharedResource_process2
  (SR: SharedResource, R: !Resource >> _): void
//
implement
SharedResource_process
  (SR) = () where
{
  val R = SharedResource_acquire (SR)
  val () = SharedResource_process2 (SR, R)
  val () = SharedResource_release (SR, R)
}
//
implement
SharedResource_process2
  (SR, R) = let
  val ans = Resource_process (R)
in
//
if ans
  then
  (
    // processing is done properly
    // there may be a need to send signals
    // to some conditional variables
  )
  else let
    val () =
      SharedResource_wait (SR, R)
    // end of [val]
  in
    SharedResource_process2 (SR, R)
  end // end of [else]
//
end // end of [SharedResource_process2]
//
[SharedResource_wait] 呼び出しは、ビジーウェイトを回避するなんらかの条件変数の待ち行列にその呼び出し元を置きます。 処理を行なう [Resource_process] に必要な条件が満たされた時はいつでも、その条件変数に (別の呼び出し元によって) シグナルを送らねければなりません。

この記事で示したコードの全体はファイル SharedResource.dats から入手できます。


この記事は Hongwei Xi によって書かれ、 Japan ATS User Group によって翻訳されています。