1 (**************************************************************************)
2 (* *)
3 (* OCaml *)
4 (* *)
5 (* David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
6 (* *)
7 (* Copyright 1996 Institut National de Recherche en Informatique et *)
8 (* en Automatique. *)
9 (* *)
10 (* All rights reserved. This file is distributed under the terms of *)
11 (* the GNU Lesser General Public License version 2.1, with the *)
12 (* special exception on linking described in the file LICENSE. *)
13 (* *)
14 (**************************************************************************)
15
16 (* Events *)
17 type 'a basic_event =
18 { poll: unit -> bool;
19 (* If communication can take place immediately, return true. *)
20 suspend: unit -> unit;
21 (* Offer the communication on the channel and get ready
22 to suspend current process. *)
23 result: unit -> 'a }
24 (* Return the result of the communication *)
25
26 type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
27
28 type 'a event =
29 Communication of 'a behavior
30 | Choose of 'a event list
31 | WrapAbort of 'a event * (unit -> unit)
32 | Guard of (unit -> 'a event)
33
34 (* Communication channels *)
35 type 'a channel =
36 { mutable writes_pending: 'a communication Queue.t;
37 (* All offers to write on it *)
38 mutable reads_pending: 'a communication Queue.t }
39 (* All offers to read from it *)
40
41 (* Communication offered *)
42 and 'a communication =
43 { performed: int ref; (* -1 if not performed yet, set to the number *)
44 (* of the matching communication after rendez-vous. *)
45 condition: Condition.t; (* To restart the blocked thread. *)
46 mutable data: 'a option; (* The data sent or received. *)
47 event_number: int } (* Event number in select *)
48
49 (* Create a channel *)
50
51 let new_channel () =
52 { writes_pending = Queue.create();
53 reads_pending = Queue.create() }
54
55 (* Basic synchronization function *)
56
57 let masterlock = Mutex.create()
58
59 let do_aborts abort_env genev performed =
60 if abort_env <> [] then begin
61 if performed >= 0 then begin
62 let ids_done = snd genev.(performed) in
63 List.iter
64 (fun (id,f) -> if not (List.mem id ids_done) then f ())
65 abort_env
66 end else begin
67 List.iter (fun (_,f) -> f ()) abort_env
68 end
69 end
70
71 let basic_sync abort_env genev =
72 let performed = ref (-1) in
73 let condition = Condition.create() in
74 let bev = Array.make (Array.length genev)
75 (fst (genev.(0)) performed condition 0) in
76 for i = 1 to Array.length genev - 1 do
77 bev.(i) <- (fst genev.(i)) performed condition i
78 done;
79 (* See if any of the events is already activable *)
80 let rec poll_events i =
81 if i >= Array.length bev
82 then false
83 else bev.(i).poll() || poll_events (i+1) in
84 Mutex.lock masterlock;
85 if not (poll_events 0) then begin
86 (* Suspend on all events *)
87 for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
88 (* Wait until the condition is signalled *)
89 Condition.wait condition masterlock;
90 (* PR#7013: protect against spurious wake-up *)
91 while !performed < 0 do Condition.wait condition masterlock done
92 end;
93 Mutex.unlock masterlock;
94 (* Extract the result *)
95 if abort_env = [] then
96 (* Preserve tail recursion *)
97 bev.(!performed).result()
98 else begin
99 let num = !performed in
100 let result = bev.(num).result() in
101 (* Handle the aborts and return the result *)
102 do_aborts abort_env genev num;
103 result
104 end
105
106 (* Apply a random permutation on an array *)
107
108 let scramble_array a =
109 let len = Array.length a in
110 if len = 0 then invalid_arg "Event.choose";
111 for i = len - 1 downto 1 do
112 let j = Random.int (i + 1) in
113 let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
114 done;
115 a
116
117 (* Main synchronization function *)
118
119 let gensym = let count = ref 0 in fun () -> incr count; !count
120
121 let rec flatten_event
122 (abort_list : int list)
123 (accu : ('a behavior * int list) list)
124 (accu_abort : (int * (unit -> unit)) list)
125 ev =
126 match ev with
127 Communication bev -> ((bev,abort_list) :: accu) , accu_abort
128 | WrapAbort (ev,fn) ->
129 let id = gensym () in
130 flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
131 | Choose evl ->
132 let rec flatten_list accu' accu_abort'= function
133 ev :: l ->
134 let (accu'',accu_abort'') =
135 flatten_event abort_list accu' accu_abort' ev in
136 flatten_list accu'' accu_abort'' l
137 | [] -> (accu',accu_abort') in
138 flatten_list accu accu_abort evl
139 | Guard fn -> flatten_event abort_list accu accu_abort (fn ())
140
141 let sync ev =
142 let (evl,abort_env) = flatten_event [] [] [] ev in
143 basic_sync abort_env (scramble_array(Array.of_list evl))
144
145 (* Event polling -- like sync, but non-blocking *)
146
147 let basic_poll abort_env genev =
148 let performed = ref (-1) in
149 let condition = Condition.create() in
150 let bev = Array.make(Array.length genev)
151 (fst genev.(0) performed condition 0) in
152 for i = 1 to Array.length genev - 1 do
153 bev.(i) <- fst genev.(i) performed condition i
154 done;
155 (* See if any of the events is already activable *)
156 let rec poll_events i =
157 if i >= Array.length bev
158 then false
159 else bev.(i).poll() || poll_events (i+1) in
160 Mutex.lock masterlock;
161 let ready = poll_events 0 in
162 if ready then begin
163 (* Extract the result *)
164 Mutex.unlock masterlock;
165 let result = Some(bev.(!performed).result()) in
166 do_aborts abort_env genev !performed; result
167 end else begin
168 (* Cancel the communication offers *)
169 performed := 0;
170 Mutex.unlock masterlock;
171 do_aborts abort_env genev (-1);
172 None
173 end
174
175 let poll ev =
176 let (evl,abort_env) = flatten_event [] [] [] ev in
177 basic_poll abort_env (scramble_array(Array.of_list evl))
178
179 (* Remove all communication opportunities already synchronized *)
180
181 let cleanup_queue q =
182 let q' = Queue.create() in
183 Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
184 q'
185
186 (* Event construction *)
187
188 let always data =
189 Communication(fun performed condition evnum ->
190 { poll = (fun () -> performed := evnum; true);
191 suspend = (fun () -> ());
192 result = (fun () -> data) })
193
194 let send channel data =
195 Communication(fun performed condition evnum ->
196 let wcomm =
197 { performed = performed;
198 condition = condition;
199 data = Some data;
200 event_number = evnum } in
201 { poll = (fun () ->
202 let rec poll () =
203 let rcomm = Queue.take channel.reads_pending in
204 if !(rcomm.performed) >= 0 then
205 poll ()
206 else begin
207 rcomm.data <- wcomm.data;
208 performed := evnum;
209 rcomm.performed := rcomm.event_number;
210 Condition.signal rcomm.condition
211 end in
212 try
213 poll();
214 true
215 with Queue.Empty ->
216 false);
217 suspend = (fun () ->
218 channel.writes_pending <- cleanup_queue channel.writes_pending;
219 Queue.add wcomm channel.writes_pending);
220 result = (fun () -> ()) })
221
222 let receive channel =
223 Communication(fun performed condition evnum ->
224 let rcomm =
225 { performed = performed;
226 condition = condition;
227 data = None;
228 event_number = evnum } in
229 { poll = (fun () ->
230 let rec poll () =
231 let wcomm = Queue.take channel.writes_pending in
232 if !(wcomm.performed) >= 0 then
233 poll ()
234 else begin
235 rcomm.data <- wcomm.data;
236 performed := evnum;
237 wcomm.performed := wcomm.event_number;
238 Condition.signal wcomm.condition
239 end in
240 try
241 poll();
242 true
243 with Queue.Empty ->
244 false);
245 suspend = (fun () ->
246 channel.reads_pending <- cleanup_queue channel.reads_pending;
247 Queue.add rcomm channel.reads_pending);
248 result = (fun () ->
249 match rcomm.data with
250 None -> invalid_arg "Event.receive"
251 | Some res -> res) })
252
253 let choose evl = Choose evl
254
255 let wrap_abort ev fn = WrapAbort(ev,fn)
256
257 let guard fn = Guard fn
258
259 let rec wrap ev fn =
260 match ev with
261 Communication genev ->
262 Communication(fun performed condition evnum ->
263 let bev = genev performed condition evnum in
264 { poll = bev.poll;
265 suspend = bev.suspend;
266 result = (fun () -> fn(bev.result())) })
267 | Choose evl ->
268 Choose(List.map (fun ev -> wrap ev fn) evl)
269 | WrapAbort (ev, f') ->
270 WrapAbort (wrap ev fn, f')
271 | Guard gu ->
272 Guard(fun () -> wrap (gu()) fn)
273
274 (* Convenience functions *)
275
276 let select evl = sync(Choose evl)
277