277 lines | 9168 chars
1 | (**************************************************************************) |
2 | (* *) |
3 | (* OCaml *) |
4 | (* *) |
5 | (* David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt *) |
6 | (* *) |
7 | (* Copyright 1996 Institut National de Recherche en Informatique et *) |
8 | (* en Automatique. *) |
9 | (* *) |
10 | (* All rights reserved. This file is distributed under the terms of *) |
11 | (* the GNU Lesser General Public License version 2.1, with the *) |
12 | (* special exception on linking described in the file LICENSE. *) |
13 | (* *) |
14 | (**************************************************************************) |
15 | |
16 | (* Events *) |
17 | type 'a basic_event = |
18 | { poll: unit -> bool; |
19 | (* If communication can take place immediately, return true. *) |
20 | suspend: unit -> unit; |
21 | (* Offer the communication on the channel and get ready |
22 | to suspend current process. *) |
23 | result: unit -> 'a } |
24 | (* Return the result of the communication *) |
25 | |
26 | type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event |
27 | |
28 | type 'a event = |
29 | Communication of 'a behavior |
30 | | Choose of 'a event list |
31 | | WrapAbort of 'a event * (unit -> unit) |
32 | | Guard of (unit -> 'a event) |
33 | |
34 | (* Communication channels *) |
35 | type 'a channel = |
36 | { mutable writes_pending: 'a communication Queue.t; |
37 | (* All offers to write on it *) |
38 | mutable reads_pending: 'a communication Queue.t } |
39 | (* All offers to read from it *) |
40 | |
41 | (* Communication offered *) |
42 | and 'a communication = |
43 | { performed: int ref; (* -1 if not performed yet, set to the number *) |
44 | (* of the matching communication after rendez-vous. *) |
45 | condition: Condition.t; (* To restart the blocked thread. *) |
46 | mutable data: 'a option; (* The data sent or received. *) |
47 | event_number: int } (* Event number in select *) |
48 | |
49 | (* Create a channel *) |
50 | |
51 | let new_channel () = |
52 | { writes_pending = Queue.create(); |
53 | reads_pending = Queue.create() } |
54 | |
55 | (* Basic synchronization function *) |
56 | |
57 | let masterlock = Mutex.create() |
58 | |
59 | let do_aborts abort_env genev performed = |
60 | if abort_env <> [] then begin |
61 | if performed >= 0 then begin |
62 | let ids_done = snd genev.(performed) in |
63 | List.iter |
64 | (fun (id,f) -> if not (List.mem id ids_done) then f ()) |
65 | abort_env |
66 | end else begin |
67 | List.iter (fun (_,f) -> f ()) abort_env |
68 | end |
69 | end |
70 | |
71 | let basic_sync abort_env genev = |
72 | let performed = ref (-1) in |
73 | let condition = Condition.create() in |
74 | let bev = Array.make (Array.length genev) |
75 | (fst (genev.(0)) performed condition 0) in |
76 | for i = 1 to Array.length genev - 1 do |
77 | bev.(i) <- (fst genev.(i)) performed condition i |
78 | done; |
79 | (* See if any of the events is already activable *) |
80 | let rec poll_events i = |
81 | if i >= Array.length bev |
82 | then false |
83 | else bev.(i).poll() || poll_events (i+1) in |
84 | Mutex.lock masterlock; |
85 | if not (poll_events 0) then begin |
86 | (* Suspend on all events *) |
87 | for i = 0 to Array.length bev - 1 do bev.(i).suspend() done; |
88 | (* Wait until the condition is signalled *) |
89 | Condition.wait condition masterlock; |
90 | (* PR#7013: protect against spurious wake-up *) |
91 | while !performed < 0 do Condition.wait condition masterlock done |
92 | end; |
93 | Mutex.unlock masterlock; |
94 | (* Extract the result *) |
95 | if abort_env = [] then |
96 | (* Preserve tail recursion *) |
97 | bev.(!performed).result() |
98 | else begin |
99 | let num = !performed in |
100 | let result = bev.(num).result() in |
101 | (* Handle the aborts and return the result *) |
102 | do_aborts abort_env genev num; |
103 | result |
104 | end |
105 | |
106 | (* Apply a random permutation on an array *) |
107 | |
108 | let scramble_array a = |
109 | let len = Array.length a in |
110 | if len = 0 then invalid_arg "Event.choose"; |
111 | for i = len - 1 downto 1 do |
112 | let j = Random.int (i + 1) in |
113 | let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp |
114 | done; |
115 | a |
116 | |
117 | (* Main synchronization function *) |
118 | |
119 | let gensym = let count = ref 0 in fun () -> incr count; !count |
120 | |
121 | let rec flatten_event |
122 | (abort_list : int list) |
123 | (accu : ('a behavior * int list) list) |
124 | (accu_abort : (int * (unit -> unit)) list) |
125 | ev = |
126 | match ev with |
127 | Communication bev -> ((bev,abort_list) :: accu) , accu_abort |
128 | | WrapAbort (ev,fn) -> |
129 | let id = gensym () in |
130 | flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev |
131 | | Choose evl -> |
132 | let rec flatten_list accu' accu_abort'= function |
133 | ev :: l -> |
134 | let (accu'',accu_abort'') = |
135 | flatten_event abort_list accu' accu_abort' ev in |
136 | flatten_list accu'' accu_abort'' l |
137 | | [] -> (accu',accu_abort') in |
138 | flatten_list accu accu_abort evl |
139 | | Guard fn -> flatten_event abort_list accu accu_abort (fn ()) |
140 | |
141 | let sync ev = |
142 | let (evl,abort_env) = flatten_event [] [] [] ev in |
143 | basic_sync abort_env (scramble_array(Array.of_list evl)) |
144 | |
145 | (* Event polling -- like sync, but non-blocking *) |
146 | |
147 | let basic_poll abort_env genev = |
148 | let performed = ref (-1) in |
149 | let condition = Condition.create() in |
150 | let bev = Array.make(Array.length genev) |
151 | (fst genev.(0) performed condition 0) in |
152 | for i = 1 to Array.length genev - 1 do |
153 | bev.(i) <- fst genev.(i) performed condition i |
154 | done; |
155 | (* See if any of the events is already activable *) |
156 | let rec poll_events i = |
157 | if i >= Array.length bev |
158 | then false |
159 | else bev.(i).poll() || poll_events (i+1) in |
160 | Mutex.lock masterlock; |
161 | let ready = poll_events 0 in |
162 | if ready then begin |
163 | (* Extract the result *) |
164 | Mutex.unlock masterlock; |
165 | let result = Some(bev.(!performed).result()) in |
166 | do_aborts abort_env genev !performed; result |
167 | end else begin |
168 | (* Cancel the communication offers *) |
169 | performed := 0; |
170 | Mutex.unlock masterlock; |
171 | do_aborts abort_env genev (-1); |
172 | None |
173 | end |
174 | |
175 | let poll ev = |
176 | let (evl,abort_env) = flatten_event [] [] [] ev in |
177 | basic_poll abort_env (scramble_array(Array.of_list evl)) |
178 | |
179 | (* Remove all communication opportunities already synchronized *) |
180 | |
181 | let cleanup_queue q = |
182 | let q' = Queue.create() in |
183 | Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q; |
184 | q' |
185 | |
186 | (* Event construction *) |
187 | |
188 | let always data = |
189 | Communication(fun performed condition evnum -> |
190 | { poll = (fun () -> performed := evnum; true); |
191 | suspend = (fun () -> ()); |
192 | result = (fun () -> data) }) |
193 | |
194 | let send channel data = |
195 | Communication(fun performed condition evnum -> |
196 | let wcomm = |
197 | { performed = performed; |
198 | condition = condition; |
199 | data = Some data; |
200 | event_number = evnum } in |
201 | { poll = (fun () -> |
202 | let rec poll () = |
203 | let rcomm = Queue.take channel.reads_pending in |
204 | if !(rcomm.performed) >= 0 then |
205 | poll () |
206 | else begin |
207 | rcomm.data <- wcomm.data; |
208 | performed := evnum; |
209 | rcomm.performed := rcomm.event_number; |
210 | Condition.signal rcomm.condition |
211 | end in |
212 | try |
213 | poll(); |
214 | true |
215 | with Queue.Empty -> |
216 | false); |
217 | suspend = (fun () -> |
218 | channel.writes_pending <- cleanup_queue channel.writes_pending; |
219 | Queue.add wcomm channel.writes_pending); |
220 | result = (fun () -> ()) }) |
221 | |
222 | let receive channel = |
223 | Communication(fun performed condition evnum -> |
224 | let rcomm = |
225 | { performed = performed; |
226 | condition = condition; |
227 | data = None; |
228 | event_number = evnum } in |
229 | { poll = (fun () -> |
230 | let rec poll () = |
231 | let wcomm = Queue.take channel.writes_pending in |
232 | if !(wcomm.performed) >= 0 then |
233 | poll () |
234 | else begin |
235 | rcomm.data <- wcomm.data; |
236 | performed := evnum; |
237 | wcomm.performed := wcomm.event_number; |
238 | Condition.signal wcomm.condition |
239 | end in |
240 | try |
241 | poll(); |
242 | true |
243 | with Queue.Empty -> |
244 | false); |
245 | suspend = (fun () -> |
246 | channel.reads_pending <- cleanup_queue channel.reads_pending; |
247 | Queue.add rcomm channel.reads_pending); |
248 | result = (fun () -> |
249 | match rcomm.data with |
250 | None -> invalid_arg "Event.receive" |
251 | | Some res -> res) }) |
252 | |
253 | let choose evl = Choose evl |
254 | |
255 | let wrap_abort ev fn = WrapAbort(ev,fn) |
256 | |
257 | let guard fn = Guard fn |
258 | |
259 | let rec wrap ev fn = |
260 | match ev with |
261 | Communication genev -> |
262 | Communication(fun performed condition evnum -> |
263 | let bev = genev performed condition evnum in |
264 | { poll = bev.poll; |
265 | suspend = bev.suspend; |
266 | result = (fun () -> fn(bev.result())) }) |
267 | | Choose evl -> |
268 | Choose(List.map (fun ev -> wrap ev fn) evl) |
269 | | WrapAbort (ev, f') -> |
270 | WrapAbort (wrap ev fn, f') |
271 | | Guard gu -> |
272 | Guard(fun () -> wrap (gu()) fn) |
273 | |
274 | (* Convenience functions *) |
275 | |
276 | let select evl = sync(Choose evl) |
277 |