(* Copyright (C) 1989, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Mon Sep 14 10:35:15 PDT 1992 by rustan *) UNSAFE MODULE Synch EXPORTS Thread; IMPORT ThreadF; FROM System IMPORT EnterSystemCritical, ExitSystemCritical; (* ------------------------------ Semaphore ------------------------------ *) TYPE Semaphore = RECORD count: INTEGER := 0; q: T := NIL END; SemaphoreNode = RECORD sem: Semaphore; next: SemaphoreList := NIL END; SemaphoreList = UNTRACED REF SemaphoreNode; PROCEDURE P( VAR sem: Semaphore ) = (* Almost-invariant: MAX( -sem.count, 0 ) processes are waiting (see Alert for exception) *) BEGIN EnterSystemCritical(); DEC( sem.count ); IF sem.count < 0 THEN ThreadF.Suspend( sem.q ) END; ExitSystemCritical() END P; PROCEDURE AlertP( VAR sem: Semaphore ) RAISES {Alerted} = (* like P, but if current thread is alerted at the time of call or sometime during the wait, the P operation fails and Alerted is raised *) (* MODIFIES count[sem], alerted[CURRENT] WHEN NOT count[sem] OR alerted[CURRENT] ENSURES (RAISE = RETURN AND count[sem] > 0 AND count'[sem] = count[sem]-1 AND UNCHANGED(alerted)) OR (RAISE = Alerted AND alerted[CURRENT] AND NOT alerted'[CURRENT] AND UNCHANGED(count)) *) BEGIN TRY EnterSystemCritical(); DEC( sem.count ); IF sem.count < 0 THEN ThreadF.AlertSuspend( ADR( sem.q )) END; (* note, this implementation always gives precedence to alerts *) IF ThreadF.TestAndClearAlert() THEN INC( sem.count ); (* P operation did not succeed *) RAISE Alerted END FINALLY ExitSystemCritical() END END AlertP; PROCEDURE V( VAR sem: Semaphore ) = BEGIN EnterSystemCritical(); INC( sem.count ); (* Note, if it weren't for alerts, the following test "sem.q # NIL" would be the same as "sem.count < 0". See Thread.Alert. *) IF sem.q # NIL THEN ThreadF.ScheduleOne( sem.q ) END; ExitSystemCritical() END V; (* ------------------------------ Condition ------------------------------ *) REVEAL Condition = BRANDED "Mosaic Condition" OBJECT sem: Semaphore := Semaphore { count := 1 }; (* to protect q *) q: SemaphoreList := NIL END; PROCEDURE Wait( m: Mutex; c: Condition ) = (* REQUIRES holder[m] = CURRENT MODIFIES holder[m], waiting[c] COMPOSITION OF Enqueue; Resume END ACTION Enqueue ENSURES holder'[m] = none AND waiting'[c] = waiting[c] + {CURRENT} ACTION Resume WHEN holder[m] = none AND NOT (CURRENT IN waiting[c]) ENSURES holder'[m] = CURRENT AND UNCHANGED(waiting) *) VAR node: SemaphoreNode; (* semaphore initialized to 0 *) BEGIN EnqueueSem( c, node ); Release( m ); (* here's where REQUIRES holder[m]=CURRENT comes into play *) P( node.sem ); Acquire( m ) END Wait; PROCEDURE AlertWait( m: Mutex; c: Condition ) RAISES {Alerted} = (* Like Wait, but if the thread is marked alerted at the time of call or sometime during the wait, lock m and raise Alerted. *) (* REQUIRES holder[m] = CURRENT MODIFIES holder[m], waiting[c], alerted[CURRENT] PRIVATE VAR alertChosen: BOOLEAN COMPOSITION OF Enqueue; ChooseOutcome; GetMutex END ACTION Enqueue ENSURES holder'[m] = none AND waiting'[c] = waiting[c] + {CURRENT} AND UNCHANGED(alerted) ACTION ChooseOutcome WHEN NOT (CURRENT IN waiting[c]) OR alerted[CURRENT] ENSURES alertChosen'[c] = NOT (CURRENT IN waiting[c]) AND waiting'[c] = delete(CURRENT, waiting[c]) AND alerted'[CURRENT] = (alerted[CURRENT] AND NOT alertChosen') AND UNCHANGED(holder) ACTION GetMutex WHEN holder[m] = none ENSURES RAISE = (IF alertChosen THEN Alerted ELSE RETURN) AND holder'[m] = CURRENT AND UNCHANGED(waiting, alerted) *) VAR node: SemaphoreNode; (* semaphore initialized to 0 *) BEGIN EnqueueSem( c, node ); TRY Release( m ); (* this REQUIRES holder[m]=CURRENT *) AlertP( node.sem ) FINALLY Acquire( m ) END END AlertWait; PROCEDURE EnqueueSem( c: Condition; READONLY node: SemaphoreNode ) = (* add semaphore node 'node' to the end of 'c's semaphore list *) BEGIN (* under c.sem, add semaphore to end of c's semaphores list *) P( c.sem ); VAR p := c.q; back: SemaphoreList := NIL; BEGIN WHILE p # NIL DO back := p; p := p.next END; IF back = NIL THEN c.q := ADR( node ) ELSE back.next := ADR( node ) END END; V( c.sem ); END EnqueueSem; PROCEDURE Signal( c: Condition ) = (* MODIFIES waiting[c] ENSURES waiting'[c] = {} OR waiting'[c] < waiting[c] *) VAR top: SemaphoreList; BEGIN (* under c.sem, remove top semaphore from c's semaphore list *) P( c.sem ); top := c.q; IF top # NIL THEN c.q := top.next END; V( c.sem ); (* signal the first waiting thread, if any *) IF top # NIL THEN V( top.sem ) END END Signal; PROCEDURE Broadcast( c: Condition ) = (* MODIFIES waiting[c] ENSURES waiting'[c] = {} *) VAR list: SemaphoreList; BEGIN (* under c.sem, remove c's semaphore list *) P( c.sem ); list := c.q; c.q := NIL; V( c.sem ); (* signal each waiting thread *) WHILE list # NIL DO V( list.sem ); list := list.next END END Broadcast; (* ------------------------------ Mutex ------------------------------ *) REVEAL Mutex = BRANDED "Mosaic Mutex" OBJECT sem: Semaphore := Semaphore { count := 1 } END; PROCEDURE Acquire( m: Mutex ) = (* MODIFIES holder[m] WHEN holder[m] = none ENSURES holder'[m] = CURRENT *) BEGIN P( m.sem ) END Acquire; PROCEDURE Release( m: Mutex ) = (* REQUIRES holder[m] = CURRENT MODIFIES holder[m] ENSURES holder'[m] = none *) BEGIN (* Note. The REQUIRES clause is not checked. According to section 5.3.1 in SPwM3, the REQUIRES clause "does not constrain the implementation to any particular behavior if the precondition is not satisfied". *) V( m.sem ) END Release; BEGIN END Synch.