(* Copyright (C) 1989, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Mon Sep 14 10:35:15 PDT 1992 by rustan *) UNSAFE MODULE Thread EXPORTS Thread, ThreadF; IMPORT RTMisc, RTRegisters, RTStack, FloatMode, RTHeap, RTMemory; IMPORT System, Word, RTStackRep, IPC; FROM System IMPORT EnterSystemCritical, ExitSystemCritical; (* ------------------------------ T ------------------------------ *) REVEAL T = BRANDED "Mosaic Thread" OBJECT done: BOOLEAN := FALSE; joinPerformed: BOOLEAN := FALSE; stack: RTStack.T := NIL; stackLow, stackHigh: ADDRESS := NIL; registers: RTRegisters.T; closure: Closure := NIL; result: REFANY := NIL; waitingForJoin: T := NIL; handlers: ADDRESS := NIL; alertPending: BOOLEAN := FALSE; alertableInQ: UNTRACED REF T := NIL; floatState: FloatMode.ThreadState; next: T := NIL (* so that linked lists of Threads can be formed easily *) END; VAR oldThread: T := NIL; (* When a thread completes its execution, oldThread will be set to that thread. Then, when the next threads starts executing (which always happens after the RTRegisters.Save in SwitchContext or at the beginning of StartThread), the old thread's stack is disposed of. *) CONST defaultStackSize = 200; (* these are in number of words *) minimumStackSize = 50; (* Note, the size of the main thread's stack is recorded as ThreadF.MainStackSize. *) PROCEDURE Fork( cl: Closure ): T = (* Returns a handle on a newly-created thread executing cl.apply() *) VAR t := NEW( T, closure := cl ); size: CARDINAL := defaultStackSize; BEGIN TYPECASE cl OF SizedClosure (scl) => IF scl.stackSize > minimumStackSize THEN size := scl.stackSize END ELSE (* skip *) END; FloatMode.InitThread( t.floatState ); t.stack := RTStack.New( t, size, t.stackLow, t.stackHigh ); RTRegisters.New( t.registers, t.stackHigh, ThreadStart ); EnterSystemCritical(); (* Add the new thread to the list of active threads *) Schedule( t ); ExitSystemCritical(); RETURN t END Fork; PROCEDURE ThreadStart() = (* REQUIRES inSystemCritical *) (* This procedure is the first one called for a new thread. *) BEGIN (* Check to see if the previous thread's stack needs to be disposed of. *) IF oldThread # NIL THEN RTStack.Dispose( oldThread.stack ); oldThread := NIL END; ExitSystemCritical(); self.result := self.closure.apply(); EnterSystemCritical(); self.done := TRUE; (* schedule any process that may be waiting to join *) Schedule( self.waitingForJoin ); self.waitingForJoin := NIL; (* switch contexts to the next scheduled thread *) VAR old := self; BEGIN self := self.next; SwitchContext( old, self ) END; <* ASSERT FALSE *> (* control should never return *) END ThreadStart; PROCEDURE Join( t: T ): REFANY = (* Wait until t has terminated and return its result. *) (* It is a checked runtime error to call Join(t) or AlertJoin(t) if some previous call on the same t to either of these routines is in progress or has returned without raising an exception. *) BEGIN EnterSystemCritical(); IF t.joinPerformed THEN RTMisc.FatalError( RTMisc.Fault.JoinCalledTwice ) END; t.joinPerformed := TRUE; IF NOT t.done THEN Suspend( t.waitingForJoin ) END; ExitSystemCritical(); RETURN t.result END Join; PROCEDURE AlertJoin( t: T ): REFANY RAISES {Alerted} = (* Like Join, but if t is marked alerted at the time of call or sometime during the wait, lock m and raise Alerted. *) (* It is a checked runtime error to call Join(t) or AlertJoin(t) if some previous call on the same t to either of these routines is in progress or has returned without raising an exception. *) BEGIN TRY EnterSystemCritical(); IF t.joinPerformed THEN RTMisc.FatalError( RTMisc.Fault.JoinCalledTwice ) END; t.joinPerformed := TRUE; IF NOT t.done THEN AlertSuspend( ADR( t.waitingForJoin )) END; IF TestAndClearAlert() THEN t.joinPerformed := FALSE; RAISE Alerted END FINALLY ExitSystemCritical() END; RETURN t.result END AlertJoin; PROCEDURE Self(): T = BEGIN RETURN self END Self; PROCEDURE MyFPState (): UNTRACED REF FloatMode.ThreadState = BEGIN RETURN ADR( self.floatState ) END MyFPState; PROCEDURE SaveThreadState( t: T ) = (* REQUIRES inSystemCritical AND t is the thread running in the user sequence *) BEGIN <* ASSERT System.InSystemCritical() AND t # NIL AND NOT inLimbo *> VAR p: T; BEGIN p := self; <* ASSERT p = gcThread *> p := p.next; <* ASSERT p = systemThread *> p := p.next; <* ASSERT p = t *> END; RTRegisters.SaveOtherSequence( t.registers ) END SaveThreadState; PROCEDURE GetRecordedStackPointer( t: T ): ADDRESS = BEGIN RETURN RTRegisters.GetStackPointer( t.registers ) END GetRecordedStackPointer; PROCEDURE GetRegisterBounds( t: T; VAR low, high: UNTRACED REF Word.T ) = BEGIN (* Note that all saved registers are within the returned bounds. If the thread's state has been saved using RTRegisters.Save, then this includes the superfluous stack, frame, and program counters. However, if the thread's state has been saved using RTRegisters.SaveOtherSequence, then it contains r0 in the place of one of those. *) WITH regs = t.registers DO low := ADR( regs[ 0 ] ); high := ADR( regs[ LAST( regs ) ] ) END END GetRegisterBounds; PROCEDURE Alert( t: T ) = (* MODIFIES alerted[t] ENSURES alerted'[t] *) BEGIN EnterSystemCritical(); t.alertPending := TRUE; IF t.alertableInQ # NIL THEN (* t is alertable, and t.alertableInQ is a pointer to the queue in which t is waiting. In particular, such a queue may be the q field in a semaphore, if thread t is waiting inside AlertP, or the waitingForJoin field within some thread, if t is waiting inside AlertJoin. The code below will remove t from this queue and schedule t. The case of waiting for a semaphore is explained here. Note that t is removed from the queue, but the semaphore count is not modified. This means that the semaphore almost-invariant "MAX( -sem.count, 0 ) processes are waiting" is falsified. However, since t is now being scheduled, it will reach the front of the active queue eventually. When it does, it will continue executing within the AlertP procedure from where it was suspended. There, it will be detected that the thread was alerted and the count will be incremented, again restoring the almost-invariant. *) VAR p := t.alertableInQ^; back: T := NIL; BEGIN WHILE p # t DO back := p; p := p.next END; IF back = NIL THEN t.alertableInQ^ := t.next ELSE back.next := t.next END END; t.next := NIL; t.alertableInQ := NIL; Schedule( t ) END; ExitSystemCritical() END Alert; PROCEDURE TestAlert(): BOOLEAN = (* MODIFIES alerted[CURRENT] ENSURES ( RESULT ==> alerted[CURRENT] ) AND ( alerted'[CURRENT] = ( alerted[CURRENT] AND NOT RESULT )) *) VAR result: BOOLEAN; BEGIN EnterSystemCritical(); result := self.alertPending; self.alertPending := FALSE; ExitSystemCritical(); RETURN result END TestAlert; PROCEDURE TestAndClearAlert(): BOOLEAN = (* REQUIRES inSystemCritical *) (* identical to TestAlert, except for the above REQUIRES *) VAR result: BOOLEAN; BEGIN result := self.alertPending; self.alertPending := FALSE; RETURN result END TestAndClearAlert; (* ------------------------------ scheduling ------------------------------ *) PROCEDURE Yield() = (* REQUIRES NOT inSystemCritical AND self # gcThread *) (* Yield furnishes user invoked preemption *) BEGIN EnterSystemCritical(); IF self.next # NIL THEN Suspend( self ) END; ExitSystemCritical() END Yield; PROCEDURE Schedule( t: T ) = (* REQUIRES inSystemCritical *) (* REQUIRES self # NIL *) (* Adds a list 't' of previously non-active threads to the end of the list of active threads. 't' is the head of the list. The given list must not contain any cycle, and no thread on it must already be active. *) VAR tt: T := self.next; back: T := self; BEGIN IF t = NIL THEN RETURN END; WHILE tt # NIL DO back := tt; tt := tt.next END; back.next := t END Schedule; PROCEDURE ScheduleOne( VAR q: T ) = (* REQUIRES inSystemCritical *) (* REQUIRES self # NIL AND q # NIL *) (* Removes the first thread from q, and schedules it. *) VAR t := q; BEGIN <* ASSERT t # NIL *> q := t.next; t.next := NIL; Schedule( t ) END ScheduleOne; PROCEDURE Suspend( VAR q: T ) = (* REQUIRES inSystemCritical AND ( "ADR"(q) = "ADR"(self) ==> self.next # NIL ) *) (* Suspend the current thread, adding 'self' to end of Thread.T list 'q'. Control is passed to the first waiting process in the active queue. *) (* Note. This procedure also works when q is passed in as 'self', provided that self.next # NIL. Then, the procedure has the effect of yielding to the next waiting process. *) (* If self.alertableInQ is passed in as non-NIL, this procedure may return as a result of the thread being alerted. In particular, if the thread is alerted at the time of call or sometime during the wait, this procedure may return. The former of these conditions is checked within this procedure, and the latter within the Alert procedure. The caller can detect if the return was due to an alerted by testing self.alertPending on exit. *) VAR nextActive := self.next; (* just in case 'q' is passed in as 'self' *) old := self; BEGIN IF self.alertableInQ # NIL AND self.alertPending THEN RETURN END; (* Add 'self' to end of given queue *) VAR t := q; back: T := NIL; BEGIN WHILE t # NIL DO back := t; t := t.next END; IF back = NIL THEN q := self ELSE back.next := self END; self.next := NIL END; (* Assign the next active thread as the running one *) self := nextActive; SwitchContext( old, self) END Suspend; PROCEDURE AlertSuspend( pq: UNTRACED REF T ) RAISES {Alerted} = (* REQUIRES inSystemCritical *) BEGIN self.alertableInQ := pq; Suspend( pq^ ); self.alertableInQ := NIL END AlertSuspend; PROCEDURE InvokeGarbageCollector() = (* REQUIRES inSystemCritical *) (* REQUIRES gcThread # NIL (that is, the garbage collector thread has been initialized) *) (* REQUIRES self # gcThread AND inSystemCritical *) (* This procedure invokes the garbage collector. Although the garbage collector runs in its own thread, this procedure calls behaves to the caller as a procedure call; once the garbage collector finishes, the caller thread becomes the running one again. *) BEGIN <* ASSERT System.InSystemCritical() AND gcThread # NIL AND self # gcThread AND gcThread.next = NIL AND NOT gcThread.done AND userSequenceThread = NIL AND NOT gcInvokedFromSystemThread *> IF self = systemThread THEN gcInvokedFromSystemThread := TRUE; IF NOT inLimbo THEN userSequenceThread := systemThread.next; <* ASSERT userSequenceThread # NIL *> END END; gcThread.next := self; self := gcThread; IPC.EmptyAvailLists(); SwitchContext( self.next, self ); gcInvokedFromSystemThread := FALSE; userSequenceThread := NIL END InvokeGarbageCollector; PROCEDURE YieldFromCollector() = (* REQUIRES self = gcThread AND inSystemCritical *) (* called by collector upon completion of garbage collection *) BEGIN <* ASSERT System.InSystemCritical() *> <* ASSERT self = gcThread *> <* ASSERT gcThread.next # NIL AND NOT gcThread.done *> self := self.next; gcThread.next := NIL; SwitchContext( gcThread, self ) END YieldFromCollector; PROCEDURE SwitchToSystemContext( isr: Word.T ) = (* REQUIRES NOT inSystemCritical AND in system sequence AND self # systemThread AND NOT RTStackRep.doStackOverflowChecks *) (* On entry, stack checks must be turned off, since the 'currStack' variables have not yet been set up. As soon as they are, this procedure turns stack checks back on. Stack overflow checks will be on at exit from this procedure. *) VAR oldSelf: T := self; oldStackLow, oldStackHigh, oldHandlers: ADDRESS; BEGIN systemThread.next := self; IF inLimbo THEN oldHandlers := currentHandlers; RTStack.GetCurrentStackLimits( oldStackLow, oldStackHigh ) ELSE self.handlers := currentHandlers END; currentHandlers := NIL; self := systemThread; RTStack.SetCurrentStackLimits( self.stackLow, self.stackHigh ); RTStackRep.doStackOverflowChecks := TRUE; IPC.Interrupt( isr ); (* Note, the registers of the system thread are uninteresting from this point on in the eyes of garbage collection; thus, they are not saved. (See also RTStack.GetBounds.) *) (* the registers are saved so that the stored away sp will be right for future calls to the garbage collector *) <* ASSERT systemThread.handlers = NIL *> self := systemThread.next; systemThread.next := NIL; IF NOT inLimbo THEN <* ASSERT oldSelf = self *> currentHandlers := self.handlers; RTStack.SetCurrentStackLimits( self.stackLow, self.stackHigh ) ELSE <* ASSERT oldSelf = NIL OR oldSelf = self *> currentHandlers := oldHandlers; RTStack.SetCurrentStackLimits( oldStackLow, oldStackHigh ) END END SwitchToSystemContext; VAR inLimbo: BOOLEAN := FALSE; PROCEDURE SwitchContext( from, to: T ) = (* REQUIRES inSystemCritical *) (* REQUIRES self = to AND from # to *) (* Switches context from 'from' to 'to'. Assumes that 'self' has already been assigned the value 'to'. If from.done = TRUE, then from's stack will be disposed of. This procedure (and this procedure alone) deals with the 'handlers' field. *) (* 'to' may be passed in as NIL. If so, this procedure busy waits until a thread becomes ready. This may result in an infinite looph. *) BEGIN IF to = NIL THEN <* ASSERT self = NIL *> RTRegisters.SaveOtherSequence( from.registers ); (* in case the garbage collector is invoked *) WHILE Self() = NIL DO inLimbo := TRUE; ExitSystemCritical(); (* allow for interrupts to happen momentarily *) EnterSystemCritical(); inLimbo := FALSE END END; IF from.done THEN <* ASSERT oldThread = NIL *> oldThread := from END; IF RTRegisters.Save( from.registers ) = 0 THEN from.handlers := currentHandlers; currentHandlers := self.handlers; RTStack.SetCurrentStackLimits( self.stackLow, self.stackHigh ); RTRegisters.Restore( self.registers ) ELSE (* A thread is resumed. Check to see if the previous thread's stack needs to be disposed of. *) IF oldThread # NIL THEN RTStack.Dispose( oldThread.stack ); oldThread := NIL END END END SwitchContext; (* ------------------------------ main ------------------------------ *) BEGIN VAR t := NEW( T ); BEGIN (* initialize the top thread, which is the first thread to run *) FloatMode.InitThread( t.floatState ); t.stackLow := RTMemory.MainStackLow; t.stackHigh := RTMemory.MainStackHigh; mainThread := t; self := mainThread; END; VAR t := NEW( T ); BEGIN (* initialize the garbage collector thread *) FloatMode.InitThread( t.floatState ); t.stackLow := RTMemory.GcStackLow; t.stackHigh := RTMemory.GcStackHigh; RTRegisters.New( t.registers, t.stackHigh, RTHeap.Collect ); gcThread := t END; VAR t := NEW( T ); BEGIN (* initialize the system sequence thread *) FloatMode.InitThread( t.floatState ); t.stackLow := RTMemory.SystemStackLow; t.stackHigh := RTMemory.SystemStackHigh; (* Note, the pc register is not important in the following call, since the thread itself will get its initial pc setup from the bootstrap, and since no other code ever inspects the pc stored as a RTRegisters.T. *) RTRegisters.New( t.registers, t.stackHigh, NIL ); systemThread := t END END Thread.