1 /**************************************************************************/
2 /* */
3 /* OCaml */
4 /* */
5 /* Contributed by Sylvain Le Gall for Lexifi */
6 /* */
7 /* Copyright 2008 Institut National de Recherche en Informatique et */
8 /* en Automatique. */
9 /* */
10 /* All rights reserved. This file is distributed under the terms of */
11 /* the GNU Lesser General Public License version 2.1, with the */
12 /* special exception on linking described in the file LICENSE. */
13 /* */
14 /**************************************************************************/
15
16 #include <caml/mlvalues.h>
17 #include <caml/alloc.h>
18 #include <caml/memory.h>
19 #include <caml/fail.h>
20 #include <caml/signals.h>
21 #include "winworker.h"
22 #include <stdio.h>
23 #include "windbug.h"
24 #include "winlist.h"
25
26 /* This constant define the maximum number of objects that
27 * can be handle by a SELECTDATA.
28 * It takes the following parameters into account:
29 * - limitation on number of objects is mostly due to limitation
30 * a WaitForMultipleObjects
31 * - there is always an event "hStop" to watch
32 *
33 * This lead to pick the following value as the biggest possible
34 * value
35 */
36 #define MAXIMUM_SELECT_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
37
38 /* Manage set of handle */
39 typedef struct _SELECTHANDLESET {
40 LPHANDLE lpHdl;
41 DWORD nMax;
42 DWORD nLast;
43 } SELECTHANDLESET;
44
45 typedef SELECTHANDLESET *LPSELECTHANDLESET;
46
47 void handle_set_init (LPSELECTHANDLESET hds, LPHANDLE lpHdl, DWORD max)
48 {
49 DWORD i;
50
51 hds->lpHdl = lpHdl;
52 hds->nMax = max;
53 hds->nLast = 0;
54
55 /* Set to invalid value every entry of the handle */
56 for (i = 0; i < hds->nMax; i++)
57 {
58 hds->lpHdl[i] = INVALID_HANDLE_VALUE;
59 };
60 }
61
62 void handle_set_add (LPSELECTHANDLESET hds, HANDLE hdl)
63 {
64 LPSELECTHANDLESET res;
65
66 if (hds->nLast < hds->nMax)
67 {
68 hds->lpHdl[hds->nLast] = hdl;
69 hds->nLast++;
70 }
71
72 DEBUG_PRINT("Adding handle %x to set %x", hdl, hds);
73 }
74
75 BOOL handle_set_mem (LPSELECTHANDLESET hds, HANDLE hdl)
76 {
77 BOOL res;
78 DWORD i;
79
80 res = FALSE;
81 for (i = 0; !res && i < hds->nLast; i++)
82 {
83 res = (hds->lpHdl[i] == hdl);
84 }
85
86 return res;
87 }
88
89 void handle_set_reset (LPSELECTHANDLESET hds)
90 {
91 DWORD i;
92
93 for (i = 0; i < hds->nMax; i++)
94 {
95 hds->lpHdl[i] = INVALID_HANDLE_VALUE;
96 }
97 hds->nMax = 0;
98 hds->nLast = 0;
99 hds->lpHdl = NULL;
100 }
101
102 /* Data structure for handling select */
103
104 typedef enum _SELECTHANDLETYPE {
105 SELECT_HANDLE_NONE = 0,
106 SELECT_HANDLE_DISK,
107 SELECT_HANDLE_CONSOLE,
108 SELECT_HANDLE_PIPE,
109 SELECT_HANDLE_SOCKET,
110 } SELECTHANDLETYPE;
111
112 typedef enum _SELECTMODE {
113 SELECT_MODE_NONE = 0,
114 SELECT_MODE_READ = 1,
115 SELECT_MODE_WRITE = 2,
116 SELECT_MODE_EXCEPT = 4,
117 } SELECTMODE;
118
119 typedef enum _SELECTSTATE {
120 SELECT_STATE_NONE = 0,
121 SELECT_STATE_INITFAILED,
122 SELECT_STATE_ERROR,
123 SELECT_STATE_SIGNALED
124 } SELECTSTATE;
125
126 typedef enum _SELECTTYPE {
127 SELECT_TYPE_NONE = 0,
128 SELECT_TYPE_STATIC, /* Result is known without running anything */
129 SELECT_TYPE_CONSOLE_READ, /* Reading data on console */
130 SELECT_TYPE_PIPE_READ, /* Reading data on pipe */
131 SELECT_TYPE_SOCKET /* Classic select */
132 } SELECTTYPE;
133
134 /* Data structure for results */
135 typedef struct _SELECTRESULT {
136 LIST lst;
137 SELECTMODE EMode;
138 int lpOrigIdx;
139 } SELECTRESULT;
140
141 typedef SELECTRESULT *LPSELECTRESULT;
142
143 /* Data structure for query */
144 typedef struct _SELECTQUERY {
145 LIST lst;
146 SELECTMODE EMode;
147 HANDLE hFileDescr;
148 int lpOrigIdx;
149 unsigned int uFlagsFd; /* Copy of filedescr->flags_fd */
150 } SELECTQUERY;
151
152 typedef SELECTQUERY *LPSELECTQUERY;
153
154 typedef struct _SELECTDATA {
155 LIST lst;
156 SELECTTYPE EType;
157 /* Sockets may generate a result for all three lists from one single
158 query object
159 */
160 SELECTRESULT aResults[MAXIMUM_SELECT_OBJECTS * 3];
161 DWORD nResultsCount;
162 /* Data following are dedicated to APC like call, they
163 will be initialized if required.
164 */
165 WORKERFUNC funcWorker;
166 SELECTQUERY aQueries[MAXIMUM_SELECT_OBJECTS];
167 DWORD nQueriesCount;
168 SELECTSTATE EState;
169 DWORD nError;
170 LPWORKER lpWorker;
171 } SELECTDATA;
172
173 typedef SELECTDATA *LPSELECTDATA;
174
175 /* Get error status if associated condition is false */
176 static BOOL check_error(LPSELECTDATA lpSelectData, BOOL bFailed)
177 {
178 if (bFailed && lpSelectData->nError == 0)
179 {
180 lpSelectData->EState = SELECT_STATE_ERROR;
181 lpSelectData->nError = GetLastError();
182 }
183 return bFailed;
184 }
185
186 /* Create data associated with a select operation */
187 LPSELECTDATA select_data_new (LPSELECTDATA lpSelectData, SELECTTYPE EType)
188 {
189 /* Allocate the data structure */
190 LPSELECTDATA res;
191 DWORD i;
192
193 res = (LPSELECTDATA)caml_stat_alloc(sizeof(SELECTDATA));
194
195 /* Init common data */
196 list_init((LPLIST)res);
197 list_next_set((LPLIST)res, (LPLIST)lpSelectData);
198 res->EType = EType;
199 res->nResultsCount = 0;
200
201
202 /* Data following are dedicated to APC like call, they
203 will be initialized if required. For now they are set to
204 invalid values.
205 */
206 res->funcWorker = NULL;
207 res->nQueriesCount = 0;
208 res->EState = SELECT_STATE_NONE;
209 res->nError = 0;
210 res->lpWorker = NULL;
211
212 return res;
213 }
214
215 /* Free select data */
216 void select_data_free (LPSELECTDATA lpSelectData)
217 {
218 DWORD i;
219
220 DEBUG_PRINT("Freeing data of %x", lpSelectData);
221
222 /* Free APC related data, if they exists */
223 if (lpSelectData->lpWorker != NULL)
224 {
225 worker_job_finish(lpSelectData->lpWorker);
226 lpSelectData->lpWorker = NULL;
227 };
228
229 /* Make sure results/queries cannot be accessed */
230 lpSelectData->nResultsCount = 0;
231 lpSelectData->nQueriesCount = 0;
232
233 caml_stat_free(lpSelectData);
234 }
235
236 /* Add a result to select data, return zero if something goes wrong. */
237 DWORD select_data_result_add (LPSELECTDATA lpSelectData, SELECTMODE EMode,
238 int lpOrigIdx)
239 {
240 DWORD res;
241 DWORD i;
242
243 res = 0;
244 if (lpSelectData->nResultsCount < MAXIMUM_SELECT_OBJECTS * 3)
245 {
246 i = lpSelectData->nResultsCount;
247 lpSelectData->aResults[i].EMode = EMode;
248 lpSelectData->aResults[i].lpOrigIdx = lpOrigIdx;
249 lpSelectData->nResultsCount++;
250 res = 1;
251 }
252
253 return res;
254 }
255
256 /* Add a query to select data, return zero if something goes wrong */
257 DWORD select_data_query_add (LPSELECTDATA lpSelectData,
258 SELECTMODE EMode,
259 HANDLE hFileDescr,
260 int lpOrigIdx,
261 unsigned int uFlagsFd)
262 {
263 DWORD res;
264 DWORD i;
265
266 res = 0;
267 if (lpSelectData->nQueriesCount < MAXIMUM_SELECT_OBJECTS)
268 {
269 i = lpSelectData->nQueriesCount;
270 lpSelectData->aQueries[i].EMode = EMode;
271 lpSelectData->aQueries[i].hFileDescr = hFileDescr;
272 lpSelectData->aQueries[i].lpOrigIdx = lpOrigIdx;
273 lpSelectData->aQueries[i].uFlagsFd = uFlagsFd;
274 lpSelectData->nQueriesCount++;
275 res = 1;
276 }
277
278 return res;
279 }
280
281 /* Search for a job that has available query slots and that match provided type.
282 * If none is found, create a new one. Return the corresponding SELECTDATA, and
283 * update provided SELECTDATA head, if required.
284 */
285 LPSELECTDATA select_data_job_search (LPSELECTDATA *lppSelectData,
286 SELECTTYPE EType)
287 {
288 LPSELECTDATA res;
289
290 res = NULL;
291
292 /* Search for job */
293 DEBUG_PRINT("Searching an available job for type %d", EType);
294 res = *lppSelectData;
295 while (
296 res != NULL
297 && !(
298 res->EType == EType
299 && res->nQueriesCount < MAXIMUM_SELECT_OBJECTS
300 )
301 )
302 {
303 res = LIST_NEXT(LPSELECTDATA, res);
304 }
305
306 /* No matching job found, create one */
307 if (res == NULL)
308 {
309 DEBUG_PRINT("No job for type %d found, create one", EType);
310 res = select_data_new(*lppSelectData, EType);
311 *lppSelectData = res;
312 }
313
314 return res;
315 }
316
317 /***********************/
318 /* Console */
319 /***********************/
320
321 void read_console_poll(HANDLE hStop, void *_data)
322 {
323 HANDLE events[2];
324 INPUT_RECORD record;
325 DWORD waitRes;
326 DWORD n;
327 LPSELECTDATA lpSelectData;
328 LPSELECTQUERY lpQuery;
329
330 DEBUG_PRINT("Waiting for data on console");
331
332 record;
333 waitRes = 0;
334 n = 0;
335 lpSelectData = (LPSELECTDATA)_data;
336 lpQuery = &(lpSelectData->aQueries[0]);
337
338 events[0] = hStop;
339 events[1] = lpQuery->hFileDescr;
340 while (lpSelectData->EState == SELECT_STATE_NONE)
341 {
342 waitRes = WaitForMultipleObjects(2, events, FALSE, INFINITE);
343 if (waitRes == WAIT_OBJECT_0
344 || check_error(lpSelectData, waitRes == WAIT_FAILED))
345 {
346 /* stop worker event or error */
347 break;
348 }
349 /* console event */
350 if (check_error(lpSelectData, PeekConsoleInput(lpQuery->hFileDescr,
351 &record, 1, &n)
352 == 0))
353 {
354 break;
355 }
356 /* check for ASCII keypress only */
357 if (record.EventType == KEY_EVENT &&
358 record.Event.KeyEvent.bKeyDown &&
359 record.Event.KeyEvent.uChar.AsciiChar != 0)
360 {
361 select_data_result_add(lpSelectData, lpQuery->EMode, lpQuery->lpOrigIdx);
362 lpSelectData->EState = SELECT_STATE_SIGNALED;
363 break;
364 }
365 else
366 {
367 /* discard everything else and try again */
368 if (check_error(lpSelectData, ReadConsoleInput(lpQuery->hFileDescr,
369 &record, 1, &n)
370 == 0))
371 {
372 break;
373 }
374 }
375 };
376 }
377
378 /* Add a function to monitor console input */
379 LPSELECTDATA read_console_poll_add (LPSELECTDATA lpSelectData,
380 SELECTMODE EMode,
381 HANDLE hFileDescr,
382 int lpOrigIdx,
383 unsigned int uFlagsFd)
384 {
385 LPSELECTDATA res;
386
387 res = select_data_new(lpSelectData, SELECT_TYPE_CONSOLE_READ);
388 res->funcWorker = read_console_poll;
389 select_data_query_add(res, SELECT_MODE_READ, hFileDescr, lpOrigIdx, uFlagsFd);
390
391 return res;
392 }
393
394 /***********************/
395 /* Pipe */
396 /***********************/
397
398 /* Monitor a pipe for input */
399 void read_pipe_poll (HANDLE hStop, void *_data)
400 {
401 DWORD res;
402 DWORD event;
403 DWORD n;
404 LPSELECTQUERY iterQuery;
405 LPSELECTDATA lpSelectData;
406 DWORD i;
407 DWORD wait;
408
409 /* Poll pipe */
410 event = 0;
411 n = 0;
412 lpSelectData = (LPSELECTDATA)_data;
413 wait = 1;
414
415 DEBUG_PRINT("Checking data pipe");
416 while (lpSelectData->EState == SELECT_STATE_NONE)
417 {
418 for (i = 0; i < lpSelectData->nQueriesCount; i++)
419 {
420 iterQuery = &(lpSelectData->aQueries[i]);
421 res = PeekNamedPipe(
422 iterQuery->hFileDescr,
423 NULL,
424 0,
425 NULL,
426 &n,
427 NULL);
428 if (check_error(lpSelectData,
429 (res == 0) &&
430 (GetLastError() != ERROR_BROKEN_PIPE)))
431 {
432 break;
433 };
434
435 if ((n > 0) || (res == 0))
436 {
437 lpSelectData->EState = SELECT_STATE_SIGNALED;
438 select_data_result_add(lpSelectData, iterQuery->EMode,
439 iterQuery->lpOrigIdx);
440 };
441 };
442
443 /* Alas, nothing except polling seems to work for pipes.
444 Check the state & stop_worker_event every 10 ms
445 */
446 if (lpSelectData->EState == SELECT_STATE_NONE)
447 {
448 event = WaitForSingleObject(hStop, wait);
449
450 /* Fast start: begin to wait 1, 2, 4, 8 and then 10 ms.
451 * If we are working with the output of a program there is
452 * a chance that one of the 4 first calls succeed.
453 */
454 wait = 2 * wait;
455 if (wait > 10)
456 {
457 wait = 10;
458 };
459 if (event == WAIT_OBJECT_0
460 || check_error(lpSelectData, event == WAIT_FAILED))
461 {
462 break;
463 }
464 }
465 }
466 DEBUG_PRINT("Finish checking data on pipe");
467 }
468
469 /* Add a function to monitor pipe input */
470 LPSELECTDATA read_pipe_poll_add (LPSELECTDATA lpSelectData,
471 SELECTMODE EMode,
472 HANDLE hFileDescr,
473 int lpOrigIdx,
474 unsigned int uFlagsFd)
475 {
476 LPSELECTDATA res;
477 LPSELECTDATA hd;
478
479 hd = lpSelectData;
480 /* Polling pipe is a non blocking operation by default. This means that each
481 worker can handle many pipe. We begin to try to find a worker that is
482 polling pipe, but for which there is under the limit of pipe per worker.
483 */
484 DEBUG_PRINT("Searching an available worker handling pipe");
485 res = select_data_job_search(&hd, SELECT_TYPE_PIPE_READ);
486
487 /* Add a new pipe to poll */
488 res->funcWorker = read_pipe_poll;
489 select_data_query_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
490
491 return hd;
492 }
493
494 /***********************/
495 /* Socket */
496 /***********************/
497
498 /* Monitor socket */
499 void socket_poll (HANDLE hStop, void *_data)
500 {
501 LPSELECTDATA lpSelectData;
502 LPSELECTQUERY iterQuery;
503 HANDLE aEvents[MAXIMUM_SELECT_OBJECTS];
504 DWORD nEvents;
505 long maskEvents;
506 DWORD i;
507 u_long iMode;
508 SELECTMODE mode;
509 WSANETWORKEVENTS events;
510
511 lpSelectData = (LPSELECTDATA)_data;
512
513 DEBUG_PRINT("Worker has %d queries to service", lpSelectData->nQueriesCount);
514 for (nEvents = 0; nEvents < lpSelectData->nQueriesCount; nEvents++)
515 {
516 iterQuery = &(lpSelectData->aQueries[nEvents]);
517 aEvents[nEvents] = CreateEvent(NULL, TRUE, FALSE, NULL);
518 maskEvents = 0;
519 mode = iterQuery->EMode;
520 if ((mode & SELECT_MODE_READ) != 0)
521 {
522 DEBUG_PRINT("Polling read for %d", iterQuery->hFileDescr);
523 maskEvents |= FD_READ | FD_ACCEPT | FD_CLOSE;
524 }
525 if ((mode & SELECT_MODE_WRITE) != 0)
526 {
527 DEBUG_PRINT("Polling write for %d", iterQuery->hFileDescr);
528 maskEvents |= FD_WRITE | FD_CONNECT | FD_CLOSE;
529 }
530 if ((mode & SELECT_MODE_EXCEPT) != 0)
531 {
532 DEBUG_PRINT("Polling exceptions for %d", iterQuery->hFileDescr);
533 maskEvents |= FD_OOB;
534 }
535
536 check_error(lpSelectData,
537 WSAEventSelect(
538 (SOCKET)(iterQuery->hFileDescr),
539 aEvents[nEvents],
540 maskEvents) == SOCKET_ERROR);
541 }
542
543 /* Add stop event */
544 aEvents[nEvents] = hStop;
545 nEvents++;
546
547 if (lpSelectData->nError == 0)
548 {
549 check_error(lpSelectData,
550 WaitForMultipleObjects(
551 nEvents,
552 aEvents,
553 FALSE,
554 INFINITE) == WAIT_FAILED);
555 };
556
557 if (lpSelectData->nError == 0)
558 {
559 for (i = 0; i < lpSelectData->nQueriesCount; i++)
560 {
561 iterQuery = &(lpSelectData->aQueries[i]);
562 if (WaitForSingleObject(aEvents[i], 0) == WAIT_OBJECT_0)
563 {
564 DEBUG_PRINT("Socket %d has pending events", (i - 1));
565 if (iterQuery != NULL)
566 {
567 /* Find out what kind of events were raised
568 */
569 if (WSAEnumNetworkEvents((SOCKET)(iterQuery->hFileDescr),
570 aEvents[i], &events) == 0)
571 {
572 if ((iterQuery->EMode & SELECT_MODE_READ) != 0
573 && (events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE))
574 != 0)
575 {
576 select_data_result_add(lpSelectData, SELECT_MODE_READ,
577 iterQuery->lpOrigIdx);
578 }
579 if ((iterQuery->EMode & SELECT_MODE_WRITE) != 0
580 && (events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE))
581 != 0)
582 {
583 select_data_result_add(lpSelectData, SELECT_MODE_WRITE,
584 iterQuery->lpOrigIdx);
585 }
586 if ((iterQuery->EMode & SELECT_MODE_EXCEPT) != 0
587 && (events.lNetworkEvents & FD_OOB) != 0)
588 {
589 select_data_result_add(lpSelectData, SELECT_MODE_EXCEPT,
590 iterQuery->lpOrigIdx);
591 }
592 }
593 }
594 }
595 /* WSAEventSelect() automatically sets socket to nonblocking mode.
596 Restore the blocking one. */
597 if (iterQuery->uFlagsFd & FLAGS_FD_IS_BLOCKING)
598 {
599 DEBUG_PRINT("Restore a blocking socket");
600 iMode = 0;
601 check_error(lpSelectData,
602 WSAEventSelect((SOCKET)(iterQuery->hFileDescr), aEvents[i], 0) != 0 ||
603 ioctlsocket((SOCKET)(iterQuery->hFileDescr), FIONBIO, &iMode) != 0);
604 }
605 else
606 {
607 check_error(lpSelectData,
608 WSAEventSelect((SOCKET)(iterQuery->hFileDescr), aEvents[i], 0) != 0);
609 };
610
611 CloseHandle(aEvents[i]);
612 aEvents[i] = INVALID_HANDLE_VALUE;
613 }
614 }
615 }
616
617 /* Add a function to monitor socket */
618 LPSELECTDATA socket_poll_add (LPSELECTDATA lpSelectData,
619 SELECTMODE EMode,
620 HANDLE hFileDescr,
621 int lpOrigIdx,
622 unsigned int uFlagsFd)
623 {
624 LPSELECTDATA res;
625 LPSELECTDATA candidate;
626 long i;
627 LPSELECTQUERY aQueries;
628
629 res = lpSelectData;
630 candidate = NULL;
631 aQueries = NULL;
632
633 /* Polling socket can be done multiple handle at the same time. You just
634 need one worker to use it. Try to find if there is already a worker
635 handling this kind of request.
636 Only one event can be associated with a given socket which means
637 that if a socket is in more than one of the fd_sets then we have
638 to find that particular query and update EMode with the
639 additional flag.
640 */
641 DEBUG_PRINT("Scanning list of worker to find one that already handle socket");
642 /* Search for job */
643 DEBUG_PRINT("Searching for an available job for type %d for descriptor %d",
644 SELECT_TYPE_SOCKET, hFileDescr);
645 while (res != NULL)
646 {
647 if (res->EType == SELECT_TYPE_SOCKET)
648 {
649 i = res->nQueriesCount - 1;
650 aQueries = res->aQueries;
651 while (i >= 0 && aQueries[i].hFileDescr != hFileDescr)
652 {
653 i--;
654 }
655 /* If we didn't find the socket but this worker has available
656 slots, store it
657 */
658 if (i < 0)
659 {
660 if ( res->nQueriesCount < MAXIMUM_SELECT_OBJECTS)
661 {
662 candidate = res;
663 }
664 res = LIST_NEXT(LPSELECTDATA, res);
665 }
666 else
667 {
668 /* Previous socket query located -- we're finished
669 */
670 aQueries = &aQueries[i];
671 break;
672 }
673 }
674 else
675 {
676 res = LIST_NEXT(LPSELECTDATA, res);
677 }
678 }
679
680 if (res == NULL)
681 {
682 res = candidate;
683
684 /* No matching job found, create one */
685 if (res == NULL)
686 {
687 DEBUG_PRINT("No job for type %d found, create one", SELECT_TYPE_SOCKET);
688 res = select_data_new(lpSelectData, SELECT_TYPE_SOCKET);
689 res->funcWorker = socket_poll;
690 res->nQueriesCount = 1;
691 aQueries = &res->aQueries[0];
692 }
693 else
694 {
695 aQueries = &(res->aQueries[res->nQueriesCount++]);
696 }
697 aQueries->EMode = EMode;
698 aQueries->hFileDescr = hFileDescr;
699 aQueries->lpOrigIdx = lpOrigIdx;
700 aQueries->uFlagsFd = uFlagsFd;
701 DEBUG_PRINT("Socket %x added", hFileDescr);
702 }
703 else
704 {
705 aQueries->EMode |= EMode;
706 DEBUG_PRINT("Socket %x updated to %d", hFileDescr, aQueries->EMode);
707 }
708
709 return res;
710 }
711
712 /***********************/
713 /* Static */
714 /***********************/
715
716 /* Add a static result */
717 LPSELECTDATA static_poll_add (LPSELECTDATA lpSelectData,
718 SELECTMODE EMode,
719 HANDLE hFileDescr,
720 int lpOrigIdx,
721 unsigned int uFlagsFd)
722 {
723 LPSELECTDATA res;
724 LPSELECTDATA hd;
725
726 /* Look for an already initialized static element */
727 hd = lpSelectData;
728 res = select_data_job_search(&hd, SELECT_TYPE_STATIC);
729
730 /* Add a new query/result */
731 select_data_query_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
732 select_data_result_add(res, EMode, lpOrigIdx);
733
734 return hd;
735 }
736
737 /********************************/
738 /* Generic select data handling */
739 /********************************/
740
741 /* Guess handle type */
742 static SELECTHANDLETYPE get_handle_type(value fd)
743 {
744 DWORD mode;
745 SELECTHANDLETYPE res;
746
747 CAMLparam1(fd);
748
749 mode = 0;
750 res = SELECT_HANDLE_NONE;
751
752 if (Descr_kind_val(fd) == KIND_SOCKET)
753 {
754 res = SELECT_HANDLE_SOCKET;
755 }
756 else
757 {
758 switch(GetFileType(Handle_val(fd)))
759 {
760 case FILE_TYPE_DISK:
761 res = SELECT_HANDLE_DISK;
762 break;
763
764 case FILE_TYPE_CHAR: /* character file or a console */
765 if (GetConsoleMode(Handle_val(fd), &mode) != 0)
766 {
767 res = SELECT_HANDLE_CONSOLE;
768 }
769 else
770 {
771 res = SELECT_HANDLE_NONE;
772 };
773 break;
774
775 case FILE_TYPE_PIPE: /* a named or an anonymous pipe (socket
776 already handled) */
777 res = SELECT_HANDLE_PIPE;
778 break;
779 };
780 };
781
782 CAMLreturnT(SELECTHANDLETYPE, res);
783 }
784
785 /* Choose what to do with given data */
786 LPSELECTDATA select_data_dispatch (LPSELECTDATA lpSelectData, SELECTMODE EMode,
787 value fd, int lpOrigIdx)
788 {
789 LPSELECTDATA res;
790 HANDLE hFileDescr;
791 struct sockaddr sa;
792 int sa_len;
793 BOOL alreadyAdded;
794 unsigned int uFlagsFd;
795
796 CAMLparam1(fd);
797
798 res = lpSelectData;
799 hFileDescr = Handle_val(fd);
800 sa_len = sizeof(sa);
801 alreadyAdded = FALSE;
802 uFlagsFd = Flags_fd_val(fd);
803
804 DEBUG_PRINT("Begin dispatching handle %x", hFileDescr);
805
806 DEBUG_PRINT("Waiting for %d on handle %x", EMode, hFileDescr);
807
808 /* There is only 2 way to have except mode: transmission of OOB data through
809 a socket TCP/IP and through a strange interaction with a TTY.
810 With windows, we only consider the TCP/IP except condition
811 */
812 switch(get_handle_type(fd))
813 {
814 case SELECT_HANDLE_DISK:
815 DEBUG_PRINT("Handle %x is a disk handle", hFileDescr);
816 /* Disk is always ready in read/write operation */
817 if (EMode == SELECT_MODE_READ || EMode == SELECT_MODE_WRITE)
818 {
819 res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
820 };
821 break;
822
823 case SELECT_HANDLE_CONSOLE:
824 DEBUG_PRINT("Handle %x is a console handle", hFileDescr);
825 /* Console is always ready in write operation, need to check for read. */
826 if (EMode == SELECT_MODE_READ)
827 {
828 res = read_console_poll_add(res, EMode, hFileDescr, lpOrigIdx,
829 uFlagsFd);
830 }
831 else if (EMode == SELECT_MODE_WRITE)
832 {
833 res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
834 };
835 break;
836
837 case SELECT_HANDLE_PIPE:
838 DEBUG_PRINT("Handle %x is a pipe handle", hFileDescr);
839 /* Console is always ready in write operation, need to check for read. */
840 if (EMode == SELECT_MODE_READ)
841 {
842 DEBUG_PRINT("Need to check availability of data on pipe");
843 res = read_pipe_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
844 }
845 else if (EMode == SELECT_MODE_WRITE)
846 {
847 DEBUG_PRINT("No need to check availability of data on pipe, "
848 "write operation always possible");
849 res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
850 };
851 break;
852
853 case SELECT_HANDLE_SOCKET:
854 DEBUG_PRINT("Handle %x is a socket handle", hFileDescr);
855 if (getsockname((SOCKET)hFileDescr, &sa, &sa_len) == SOCKET_ERROR)
856 {
857 if (WSAGetLastError() == WSAEINVAL)
858 {
859 /* Socket is not bound */
860 DEBUG_PRINT("Socket is not connected");
861 if (EMode == SELECT_MODE_WRITE || EMode == SELECT_MODE_READ)
862 {
863 res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
864 alreadyAdded = TRUE;
865 }
866 }
867 }
868 if (!alreadyAdded)
869 {
870 res = socket_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
871 }
872 break;
873
874 default:
875 DEBUG_PRINT("Handle %x is unknown", hFileDescr);
876 win32_maperr(ERROR_INVALID_HANDLE);
877 uerror("select", Nothing);
878 break;
879 };
880
881 DEBUG_PRINT("Finish dispatching handle %x", hFileDescr);
882
883 CAMLreturnT(LPSELECTDATA, res);
884 }
885
886 static DWORD caml_list_length (value lst)
887 {
888 DWORD res;
889
890 CAMLparam1 (lst);
891 CAMLlocal1 (l);
892
893 for (res = 0, l = lst; l != Val_int(0); l = Field(l, 1), res++)
894 { }
895
896 CAMLreturnT(DWORD, res);
897 }
898
899 static value find_handle(LPSELECTRESULT iterResult, value readfds,
900 value writefds, value exceptfds)
901 {
902 CAMLparam3(readfds, writefds, exceptfds);
903 CAMLlocal2(result, list);
904 int i;
905
906 switch( iterResult->EMode )
907 {
908 case SELECT_MODE_READ:
909 list = readfds;
910 break;
911 case SELECT_MODE_WRITE:
912 list = writefds;
913 break;
914 case SELECT_MODE_EXCEPT:
915 list = exceptfds;
916 break;
917 case SELECT_MODE_NONE:
918 CAMLassert(0);
919 };
920
921 for(i=0; list != Val_unit && i < iterResult->lpOrigIdx; ++i )
922 {
923 list = Field(list, 1);
924 }
925
926 if (list == Val_unit)
927 caml_failwith ("select.c: original file handle not found");
928
929 result = Field(list, 0);
930
931 CAMLreturn( result );
932 }
933
934 #define MAX(a, b) ((a) > (b) ? (a) : (b))
935
936 /* Convert fdlist to an fd_set if all the handles in fdlist are
937 * sockets and return 1. Returns 0 if a non-socket value is
938 * encountered, or if there are more than FD_SETSIZE sockets.
939 */
940 static int fdlist_to_fdset(value fdlist, fd_set *fdset)
941 {
942 value l, c;
943 int n = 0;
944 FD_ZERO(fdset);
945 for (l = fdlist; l != Val_int(0); l = Field(l, 1)) {
946 if (++n > FD_SETSIZE) {
947 DEBUG_PRINT("More than FD_SETSIZE sockets");
948 return 0;
949 }
950 c = Field(l, 0);
951 if (Descr_kind_val(c) == KIND_SOCKET) {
952 FD_SET(Socket_val(c), fdset);
953 } else {
954 DEBUG_PRINT("Non socket value encountered");
955 return 0;
956 }
957 }
958 return 1;
959 }
960
961 static value fdset_to_fdlist(value fdlist, fd_set *fdset)
962 {
963 CAMLparam1(fdlist);
964 CAMLlocal2(res, s);
965 res = Val_int(0);
966 for (/*nothing*/; fdlist != Val_int(0); fdlist = Field(fdlist, 1)) {
967 s = Field(fdlist, 0);
968 if (FD_ISSET(Socket_val(s), fdset)) {
969 value newres = caml_alloc_small(2, 0);
970 Field(newres, 0) = s;
971 Field(newres, 1) = res;
972 res = newres;
973 }
974 }
975 CAMLreturn(res);
976 }
977
978 CAMLprim value unix_select(value readfds, value writefds, value exceptfds,
979 value timeout)
980 {
981 /* Event associated to handle */
982 DWORD nEventsCount;
983 DWORD nEventsMax;
984 HANDLE *lpEventsDone;
985
986 /* Data for all handles */
987 LPSELECTDATA lpSelectData;
988 LPSELECTDATA iterSelectData;
989
990 /* Iterator for results */
991 LPSELECTRESULT iterResult;
992
993 /* Iterator */
994 DWORD i;
995
996 /* Error status */
997 DWORD err;
998
999 /* Time to wait */
1000 DWORD milliseconds;
1001
1002 /* Is there static select data */
1003 BOOL hasStaticData = FALSE;
1004
1005 /* Wait return */
1006 DWORD waitRet;
1007
1008 /* Set of handle */
1009 SELECTHANDLESET hds;
1010 DWORD hdsMax;
1011 LPHANDLE hdsData;
1012
1013 /* Length of each list */
1014 DWORD readfds_len;
1015 DWORD writefds_len;
1016 DWORD exceptfds_len;
1017
1018 CAMLparam4 (readfds, writefds, exceptfds, timeout);
1019 CAMLlocal5 (read_list, write_list, except_list, res, l);
1020 CAMLlocal1 (fd);
1021
1022 fd_set read, write, except;
1023 double tm;
1024 struct timeval tv;
1025 struct timeval * tvp;
1026
1027 DEBUG_PRINT("in select");
1028
1029 err = 0;
1030 tm = Double_val(timeout);
1031 if (readfds == Val_int(0)
1032 && writefds == Val_int(0)
1033 && exceptfds == Val_int(0)) {
1034 DEBUG_PRINT("nothing to do");
1035 if ( tm > 0.0 ) {
1036 caml_enter_blocking_section();
1037 Sleep( (int)(tm * 1000));
1038 caml_leave_blocking_section();
1039 }
1040 read_list = write_list = except_list = Val_int(0);
1041 } else {
1042 if (fdlist_to_fdset(readfds, &read)
1043 && fdlist_to_fdset(writefds, &write)
1044 && fdlist_to_fdset(exceptfds, &except)) {
1045 DEBUG_PRINT("only sockets to select on, using classic select");
1046 if (tm < 0.0) {
1047 tvp = (struct timeval *) NULL;
1048 } else {
1049 tv.tv_sec = (int) tm;
1050 tv.tv_usec = (int) (1e6 * (tm - (int) tm));
1051 tvp = &tv;
1052 }
1053 caml_enter_blocking_section();
1054 if (select(FD_SETSIZE, &read, &write, &except, tvp) == -1) {
1055 err = WSAGetLastError();
1056 DEBUG_PRINT("Error %ld occurred", err);
1057 }
1058 caml_leave_blocking_section();
1059 if (err) {
1060 DEBUG_PRINT("Error %ld occurred", err);
1061 win32_maperr(err);
1062 uerror("select", Nothing);
1063 }
1064 read_list = fdset_to_fdlist(readfds, &read);
1065 write_list = fdset_to_fdlist(writefds, &write);
1066 except_list = fdset_to_fdlist(exceptfds, &except);
1067 } else {
1068 nEventsCount = 0;
1069 nEventsMax = 0;
1070 lpEventsDone = NULL;
1071 lpSelectData = NULL;
1072 iterSelectData = NULL;
1073 iterResult = NULL;
1074 hasStaticData = 0;
1075 waitRet = 0;
1076 readfds_len = caml_list_length(readfds);
1077 writefds_len = caml_list_length(writefds);
1078 exceptfds_len = caml_list_length(exceptfds);
1079 hdsMax = MAX(readfds_len, MAX(writefds_len, exceptfds_len));
1080
1081 hdsData = (HANDLE *)caml_stat_alloc(sizeof(HANDLE) * hdsMax);
1082
1083 if (tm >= 0.0)
1084 {
1085 milliseconds = 1000 * tm;
1086 DEBUG_PRINT("Will wait %d ms", milliseconds);
1087 }
1088 else
1089 {
1090 milliseconds = INFINITE;
1091 }
1092
1093
1094 /* Create list of select data, based on the different list of fd
1095 to watch */
1096 DEBUG_PRINT("Dispatch read fd");
1097 handle_set_init(&hds, hdsData, hdsMax);
1098 i=0;
1099 for (l = readfds; l != Val_int(0); l = Field(l, 1))
1100 {
1101 fd = Field(l, 0);
1102 if (!handle_set_mem(&hds, Handle_val(fd)))
1103 {
1104 handle_set_add(&hds, Handle_val(fd));
1105 lpSelectData = select_data_dispatch(lpSelectData,
1106 SELECT_MODE_READ, fd, i++);
1107 }
1108 else
1109 {
1110 DEBUG_PRINT("Discarding handle %x which is already monitor "
1111 "for read", Handle_val(fd));
1112 }
1113 }
1114 handle_set_reset(&hds);
1115
1116 DEBUG_PRINT("Dispatch write fd");
1117 handle_set_init(&hds, hdsData, hdsMax);
1118 i=0;
1119 for (l = writefds; l != Val_int(0); l = Field(l, 1))
1120 {
1121 fd = Field(l, 0);
1122 if (!handle_set_mem(&hds, Handle_val(fd)))
1123 {
1124 handle_set_add(&hds, Handle_val(fd));
1125 lpSelectData = select_data_dispatch(lpSelectData,
1126 SELECT_MODE_WRITE, fd, i++);
1127 }
1128 else
1129 {
1130 DEBUG_PRINT("Discarding handle %x which is already monitor "
1131 "for write", Handle_val(fd));
1132 }
1133 }
1134 handle_set_reset(&hds);
1135
1136 DEBUG_PRINT("Dispatch exceptional fd");
1137 handle_set_init(&hds, hdsData, hdsMax);
1138 i=0;
1139 for (l = exceptfds; l != Val_int(0); l = Field(l, 1))
1140 {
1141 fd = Field(l, 0);
1142 if (!handle_set_mem(&hds, Handle_val(fd)))
1143 {
1144 handle_set_add(&hds, Handle_val(fd));
1145 lpSelectData = select_data_dispatch(lpSelectData,
1146 SELECT_MODE_EXCEPT, fd, i++);
1147 }
1148 else
1149 {
1150 DEBUG_PRINT("Discarding handle %x which is already monitor "
1151 "for exceptional", Handle_val(fd));
1152 }
1153 }
1154 handle_set_reset(&hds);
1155
1156 /* Building the list of handle to wait for */
1157 DEBUG_PRINT("Building events done array");
1158 nEventsMax = list_length((LPLIST)lpSelectData);
1159 nEventsCount = 0;
1160 lpEventsDone = (HANDLE *)caml_stat_alloc(sizeof(HANDLE) * nEventsMax);
1161
1162 iterSelectData = lpSelectData;
1163 while (iterSelectData != NULL)
1164 {
1165 /* Check if it is static data. If this is the case, launch everything
1166 * but don't wait for events. It helps to test if there are events on
1167 * any other fd (which are not static), knowing that there is at least
1168 * one result (the static data).
1169 */
1170 if (iterSelectData->EType == SELECT_TYPE_STATIC)
1171 {
1172 hasStaticData = TRUE;
1173 };
1174
1175 /* Execute APC */
1176 if (iterSelectData->funcWorker != NULL)
1177 {
1178 iterSelectData->lpWorker =
1179 worker_job_submit(
1180 iterSelectData->funcWorker,
1181 (void *)iterSelectData);
1182 DEBUG_PRINT("Job submitted to worker %x",
1183 iterSelectData->lpWorker);
1184 lpEventsDone[nEventsCount]
1185 = worker_job_event_done(iterSelectData->lpWorker);
1186 nEventsCount++;
1187 };
1188 iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1189 };
1190
1191 DEBUG_PRINT("Need to watch %d workers", nEventsCount);
1192
1193 /* Processing select itself */
1194 caml_enter_blocking_section();
1195 /* There are worker started, waiting to be monitored */
1196 if (nEventsCount > 0)
1197 {
1198 /* Waiting for event */
1199 if (err == 0 && !hasStaticData)
1200 {
1201 DEBUG_PRINT("Waiting for one select worker to be done");
1202 switch (WaitForMultipleObjects(nEventsCount, lpEventsDone, FALSE,
1203 milliseconds))
1204 {
1205 case WAIT_FAILED:
1206 err = GetLastError();
1207 break;
1208
1209 case WAIT_TIMEOUT:
1210 DEBUG_PRINT("Select timeout");
1211 break;
1212
1213 default:
1214 DEBUG_PRINT("One worker is done");
1215 break;
1216 };
1217 }
1218
1219 /* Ordering stop to every worker */
1220 DEBUG_PRINT("Sending stop signal to every select workers");
1221 iterSelectData = lpSelectData;
1222 while (iterSelectData != NULL)
1223 {
1224 if (iterSelectData->lpWorker != NULL)
1225 {
1226 worker_job_stop(iterSelectData->lpWorker);
1227 };
1228 iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1229 };
1230
1231 DEBUG_PRINT("Waiting for every select worker to be done");
1232 switch (WaitForMultipleObjects(nEventsCount, lpEventsDone, TRUE,
1233 INFINITE))
1234 {
1235 case WAIT_FAILED:
1236 err = GetLastError();
1237 break;
1238
1239 default:
1240 DEBUG_PRINT("Every worker is done");
1241 break;
1242 }
1243 }
1244 /* Nothing to monitor but some time to wait. */
1245 else if (!hasStaticData)
1246 {
1247 Sleep(milliseconds);
1248 }
1249 caml_leave_blocking_section();
1250
1251 DEBUG_PRINT("Error status: %d (0 is ok)", err);
1252 /* Build results */
1253 if (err == 0)
1254 {
1255 DEBUG_PRINT("Building result");
1256 read_list = Val_unit;
1257 write_list = Val_unit;
1258 except_list = Val_unit;
1259
1260 iterSelectData = lpSelectData;
1261 while (iterSelectData != NULL)
1262 {
1263 for (i = 0; i < iterSelectData->nResultsCount; i++)
1264 {
1265 iterResult = &(iterSelectData->aResults[i]);
1266 l = caml_alloc_small(2, 0);
1267 Field(l, 0) = find_handle(iterResult, readfds, writefds,
1268 exceptfds);
1269 switch (iterResult->EMode)
1270 {
1271 case SELECT_MODE_READ:
1272 Field(l, 1) = read_list;
1273 read_list = l;
1274 break;
1275 case SELECT_MODE_WRITE:
1276 Field(l, 1) = write_list;
1277 write_list = l;
1278 break;
1279 case SELECT_MODE_EXCEPT:
1280 Field(l, 1) = except_list;
1281 except_list = l;
1282 break;
1283 case SELECT_MODE_NONE:
1284 CAMLassert(0);
1285 }
1286 }
1287 /* We try to only process the first error, bypass other errors */
1288 if (err == 0 && iterSelectData->EState == SELECT_STATE_ERROR)
1289 {
1290 err = iterSelectData->nError;
1291 }
1292 iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1293 }
1294 }
1295
1296 /* Free resources */
1297 DEBUG_PRINT("Free selectdata resources");
1298 iterSelectData = lpSelectData;
1299 while (iterSelectData != NULL)
1300 {
1301 lpSelectData = iterSelectData;
1302 iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1303 select_data_free(lpSelectData);
1304 }
1305 lpSelectData = NULL;
1306
1307 /* Free allocated events/handle set array */
1308 DEBUG_PRINT("Free local allocated resources");
1309 caml_stat_free(lpEventsDone);
1310 caml_stat_free(hdsData);
1311
1312 DEBUG_PRINT("Raise error if required");
1313 if (err != 0)
1314 {
1315 win32_maperr(err);
1316 uerror("select", Nothing);
1317 }
1318 }
1319 }
1320
1321 DEBUG_PRINT("Build final result");
1322 res = caml_alloc_small(3, 0);
1323 Field(res, 0) = read_list;
1324 Field(res, 1) = write_list;
1325 Field(res, 2) = except_list;
1326
1327 DEBUG_PRINT("out select");
1328
1329 CAMLreturn(res);
1330 }
1331