(* Copyright (C) 1989, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last Modified On Tue Aug 11 10:03:15 PDT 1992 by muller *) (* Modified On Fri Mar 20 07:46:13 PST 1992 by kalsow *) UNSAFE MODULE Thread EXPORTS Thread, ThreadF, RTScheduler; IMPORT RT0u, RTMisc, Word, Time, RTThread, Ctypes, Cerrno, Cstring; IMPORT Unix, Utime, Usignal, FloatMode, SmallIO, Convert, RTProc; IMPORT Uuio, ThreadEvent, LowPerfTool, RTStack, RTParams; REVEAL (* Remember, the report (p 43-44) says that MUTEX is predeclared and <: ROOT; just pretend that we have "TYPE MUTEX <: ROOT" in our interface. The sem field is where we store the semaphore that implements the mutual exclusion and waitingForMe is the head of the list of threads that are waiting for the mutex to be released so that they can acquire it (the list is continued in the nextWaitingForMutex field of the threads) *) MUTEX = BRANDED "Mutex 1.0" OBJECT holder : T := NIL; waitingForMe : T := NIL; END; (* Threads that wait on a condition are inserted in the waitingForMe list, which is continued in the nextWaitingForCondition field of the waiting threads. *) Condition = BRANDED "Thread.Condition v1.0" OBJECT waitingForMe: T := NIL; END; REVEAL T = BRANDED "Thread.T v1.5" OBJECT state: State; id: Id; (* our work and its result *) closure : Closure; result : REFANY := NIL; (* the threads are organized in a circular list *) previous, next: T; (* next thread that waits for: CASE state OF | waiting => the same condition; | locking => the same mutex; | pausing => a specified time; | blocking => some IO; *) nextWaiting: T; (* if state = waiting, the condition on which we wait *) waitingForCondition: Condition; waitingForMutex: Mutex; (* if state = pausing, the time at which we can restart *) waitingForTime : Time.T; (* true if we are waiting during an AlertWait or AlertJoin or AlertPause *) alertable: BOOLEAN := FALSE; (* true if somebody alerted us and we did not TestAlert *) alertPending : BOOLEAN := FALSE; (* This condition is signaled then the thread terminates; other threads that want to join can just wait for it *) endCondition: Condition; (* where we carry our work. The first thread runs on the original C program stack and its context.stack is NIL *) context : Context; (* if state = blocking, the descriptors we are waiting on *) select : RECORD nfds: CARDINAL; saveReadFDS, saveWriteFDS, saveExceptFDS: Unix.FDSet; readFDS, writeFDS, exceptFDS: UNTRACED REF Unix.FDSet; timeout: Time.T; hasTimeout: BOOLEAN; result: INTEGER; errno: INTEGER; END; (* state that is available to the floating point routines *) floatState : FloatMode.ThreadState; END; TYPE IntPtr = UNTRACED REF INTEGER; VAR preemption: BOOLEAN; (* this is really a constant, but we need to take its address *) ZeroTimeout := Utime.struct_timeval {0, 0}; (* same here *) emptyFDS := Unix.FDSet {}; VAR (* we start the heavy machinery only when we have more than one thread *) multipleThreads: BOOLEAN := FALSE; topThread: T; (* the thread in which Main runs *) pausedThreads : T; selected_interval:= Utime.struct_timeval {0, 100 * 1000}; defaultStackSize := 3000; VAR stats: RECORD n_forks := 0; n_dead := 0; n_joins := 0; END; EXCEPTION InternalError; <*FATAL InternalError*> CONST ForkYieldRatio = 5; VAR nextId: Id := 1; (*------------------------------------------------- user-level procedures ---*) PROCEDURE NewMutex (): Mutex = BEGIN RETURN (NEW (Mutex)); END NewMutex; PROCEDURE NewCondition (): Condition = BEGIN RETURN (NEW (Condition)); END NewCondition; PROCEDURE MinDefaultStackSize (min: CARDINAL) = BEGIN INC (RT0u.inCritical); defaultStackSize := MAX (defaultStackSize, min); DEC (RT0u.inCritical); END MinDefaultStackSize; PROCEDURE IncDefaultStackSize (inc: CARDINAL) = BEGIN INC (RT0u.inCritical); INC (defaultStackSize, inc); DEC (RT0u.inCritical); END IncDefaultStackSize; PROCEDURE Fork (cl: Closure): T = VAR t: T; size: CARDINAL; BEGIN (* make sure that thread switching keeps up with thread creation *) INC (stats.n_forks); IF (stats.n_forks MOD ForkYieldRatio) = 0 THEN Yield () END; INC (RT0u.inCritical); IF NOT multipleThreads THEN (* this is the first time we have more than one thread; we can start to consider switching *) multipleThreads := TRUE; StartSwitching (); END; t := NEW (T, closure := cl, id := nextId); INC (nextId); (* determine the size of the stack for this thread *) TYPECASE cl OF | SizedClosure (scl) => IF scl.stackSize = 0 THEN size := defaultStackSize; ELSE size := scl.stackSize; END; ELSE size := defaultStackSize; END; (* allocate a condition variable for this thread *) t.endCondition := NewCondition (); (* link the thread into the global ring *) t.next := self.next; t.previous := self; self.next.previous := t; self.next := t; InitContext (t.context, size); CanRun (t); IF hooks # NIL THEN hooks.fork (t) END; DEC (RT0u.inCritical); RETURN t; END Fork; PROCEDURE Join (t: T): REFANY RAISES {} = <*FATAL Alerted*> BEGIN self.alertable := FALSE; RETURN XJoin (t); END Join; PROCEDURE AlertJoin (t: T): REFANY RAISES {Alerted} = BEGIN self.alertable := TRUE; RETURN XJoin (t); END AlertJoin; PROCEDURE XJoin (t: T): REFANY RAISES {Alerted} = VAR c: Condition; BEGIN INC (RT0u.inCritical); WHILE (t.state # State.dying) AND (t.state # State.dead) DO (*** INLINE Wait (RT0u.inCritical, t.endCondition) ***) c := t.endCondition; ICannotRun (State.waiting); self.waitingForCondition := c; self.nextWaiting := c.waitingForMe; c.waitingForMe := self; DEC (RT0u.inCritical); InternalYield (); INC (RT0u.inCritical); END; t.state := State.dead; IF perfOn THEN PerfChanged (t.id, State.dead); END; INC (stats.n_joins); DEC (RT0u.inCritical); RETURN t.result; END XJoin; PROCEDURE Wait (m: Mutex; c: Condition) = <*FATAL Alerted*> BEGIN self.alertable := FALSE; XWait (m, c); END Wait; PROCEDURE AlertWait (m: Mutex; c: Condition) RAISES {Alerted} = BEGIN self.alertable := TRUE; XWait (m, c); END AlertWait; PROCEDURE XWait (m: Mutex; c: Condition) RAISES {Alerted} = BEGIN TRY INC (RT0u.inCritical); EVAL XRelease (m); ICannotRun (State.waiting); self.waitingForCondition := c; self.nextWaiting := c.waitingForMe; c.waitingForMe := self; DEC (RT0u.inCritical); InternalYield (); FINALLY Acquire (m); END; END XWait; PROCEDURE Signal (c: Condition) = BEGIN XSignal (c, 1); END Signal; PROCEDURE Broadcast (c: Condition) = BEGIN XSignal (c, -1); END Broadcast; PROCEDURE XSignal (c: Condition; limit: INTEGER) = VAR t: T; BEGIN INC (RT0u.inCritical); LOOP t := c.waitingForMe; IF (t = NIL) THEN EXIT END; c.waitingForMe := t.nextWaiting; CanRun (t); DEC (limit); IF limit = 0 THEN EXIT END; END; DEC (RT0u.inCritical); END XSignal; PROCEDURE Alert (t: T) = BEGIN INC (RT0u.inCritical); t.alertPending := TRUE; DEC (RT0u.inCritical); END Alert; PROCEDURE TestAlert (): BOOLEAN = VAR result: BOOLEAN; BEGIN INC (RT0u.inCritical); result := self.alertPending; self.alertPending := FALSE; DEC (RT0u.inCritical); RETURN result; END TestAlert; PROCEDURE Yield () = <*FATAL Alerted*> BEGIN self.alertable := FALSE; InternalYield (); END Yield; PROCEDURE Self (): T = BEGIN RETURN self; END Self; (*--------------------------------------------------------------- MUTEXes ---*) PROCEDURE Acquire (m: Mutex) = <*FATAL Alerted*> BEGIN LOOP INC (RT0u.inCritical); IF m.holder = NIL THEN <* ASSERT self # NIL *> m.holder := self; DEC (RT0u.inCritical); RETURN; END; ICannotRun (State.locking); self.waitingForMutex := m; self.nextWaiting := m.waitingForMe; self.alertable := FALSE; m.waitingForMe := self; DEC (RT0u.inCritical); InternalYield (); END; END Acquire; PROCEDURE Release (m: Mutex) = <*FATAL Alerted*> VAR waiters: BOOLEAN; BEGIN INC (RT0u.inCritical); waiters := XRelease (m); DEC (RT0u.inCritical); IF waiters THEN self.alertable := FALSE; InternalYield (); END; END Release; PROCEDURE XRelease (m: Mutex): BOOLEAN = (* called while inCritical *) VAR t, last_t: T; BEGIN IF m.holder # self THEN SleazyRelease (m) END; m.holder := NIL; t := m.waitingForMe; IF (t = NIL) THEN RETURN FALSE END; (* search for the end: t == last thread, last_t == second to last one *) last_t := NIL; WHILE (t.nextWaiting # NIL) DO last_t := t; t := t.nextWaiting END; IF (last_t # NIL) THEN last_t.nextWaiting := NIL; (* multiple threads are waiting *) ELSE m.waitingForMe := NIL; (* only one thread is waiting *) END; t.nextWaiting := NIL; CanRun (t); RETURN TRUE; END XRelease; PROCEDURE SleazyRelease (m: Mutex) = BEGIN DumpEverybody (); SmallIO.PutText (SmallIO.stderr, "*** Mutex "); OutA (m, 0); IF m.holder = NIL THEN SmallIO.PutText (SmallIO.stderr, " is not locked.\n"); ELSE SmallIO.PutText (SmallIO.stderr, " is held by thread #"); OutI (m.holder.id, 0); SmallIO.PutText (SmallIO.stderr, ".\n"); END; RTMisc.FatalError ("Thread.m3", 381, "illegal Thread.Release"); END SleazyRelease; (*--------------------------------------------- garbage collector support ---*) PROCEDURE ProcessStacks (p: PROCEDURE (start, stop: ADDRESS)) = VAR t:= self; start, stop: ADDRESS; BEGIN (* save my state *) EVAL RTThread.Save (self.context.buf); REPEAT Tos (t.context, start, stop); (* process the stack *) p (start, stop); WITH z = t.context.buf DO (* process the registers *) p (ADR (z), ADR (z) + ADRSIZE (z)) END; t := t.next; UNTIL t = self; END ProcessStacks; (*------------------------------------------------- I/O and Timer support ---*) PROCEDURE Pause (until: Time.T; alertable := FALSE) RAISES {Alerted} = BEGIN INC (RT0u.inCritical); self.waitingForTime := until; self.alertable := alertable; ICannotRun (State.pausing); DEC (RT0u.inCritical); InternalYield (); END Pause; PROCEDURE IOAlertSelect (nfds: INTEGER; readfds, writefds, exceptfds: UNTRACED REF Unix.FDSet; timeout: UNTRACED REF Utime.struct_timeval := NIL) : INTEGER RAISES {Alerted} = BEGIN self.alertable := TRUE; RETURN XIOSelect (nfds, readfds, writefds, exceptfds, timeout); END IOAlertSelect; PROCEDURE IOSelect (nfds: INTEGER; readfds, writefds, exceptfds: UNTRACED REF Unix.FDSet; timeout: UNTRACED REF Utime.struct_timeval := NIL) : INTEGER = <*FATAL Alerted*> BEGIN self.alertable := FALSE; RETURN XIOSelect (nfds, readfds, writefds, exceptfds, timeout); END IOSelect; PROCEDURE XIOSelect (nfds: INTEGER; readFDS, writeFDS, exceptFDS: UNTRACED REF Unix.FDSet; timeout: UNTRACED REF Utime.struct_timeval) : INTEGER RAISES {Alerted} = VAR n: INTEGER; saveReadFDS, saveWriteFDS, saveExceptFDS: Unix.FDSet; BEGIN (* If we are in a single-threaded program do just what the user wants *) IF NOT multipleThreads THEN self.alertable := FALSE; RETURN Unix.select (nfds, readFDS, writeFDS, exceptFDS, timeout); END; (* First, try to satisfy the request immediately *) (* we need to save the values, because the call to select may modify them and we need them later *) IF readFDS = NIL THEN readFDS := ADR (emptyFDS); END; saveReadFDS := readFDS^; IF writeFDS = NIL THEN writeFDS := ADR (emptyFDS); END; saveWriteFDS := writeFDS^; IF exceptFDS = NIL THEN exceptFDS := ADR (emptyFDS); END; saveExceptFDS := exceptFDS^; n := Unix.select (nfds, readFDS, writeFDS, exceptFDS, ADR (ZeroTimeout)); IF n # 0 OR (timeout # NIL AND timeout.tv_sec = 0 AND timeout.tv_usec = 0) THEN self.alertable := FALSE; RETURN n; END; (* restore the masks *) readFDS^ := saveReadFDS; writeFDS^ := saveWriteFDS; exceptFDS^ := saveExceptFDS; (* This thing blocks, schedule it for later *) INC (RT0u.inCritical); self.select.nfds := nfds; self.select.saveReadFDS := saveReadFDS; self.select.saveWriteFDS := saveWriteFDS; self.select.saveExceptFDS := saveExceptFDS; self.select.readFDS := readFDS; self.select.writeFDS := writeFDS; self.select.exceptFDS := exceptFDS; IF timeout # NIL THEN self.select.hasTimeout := TRUE; self.select.timeout := Time.Add (Time.Now (), Time.T {timeout.tv_sec, timeout.tv_usec}); ELSE self.select.hasTimeout := FALSE; END; ICannotRun (State.blocking); DEC (RT0u.inCritical); InternalYield (); Cerrno.errno := self.select.errno; RETURN (self.select.result); END XIOSelect; (*------------------------------------------------ timer-based preemption ---*) PROCEDURE StartSwitching () = (* set the SIGVTALRM timer and handler; can be called to change the switching interval *) VAR it, oit: Utime.struct_itimerval; sv, osv: Usignal.struct_sigvec; BEGIN IF preemption THEN sv.sv_handler := LOOPHOLE (switch_thread, Usignal.SignalHandler); sv.sv_mask := Usignal.empty_sv_mask; sv.sv_flags := 0; IF Usignal.sigvec (Usignal.SIGVTALRM, sv, osv) # 0 THEN RAISE InternalError; END; it.it_interval.tv_sec := selected_interval.tv_sec; it.it_interval.tv_usec := selected_interval.tv_usec; it.it_value.tv_sec := selected_interval.tv_sec; it.it_value.tv_usec := selected_interval.tv_usec; IF Utime.setitimer (Utime.ITIMER_VIRTUAL, it, oit) # 0 THEN RAISE InternalError; END; allow_sigvtalrm (); END; END StartSwitching; TYPE SignalData = UNTRACED REF Usignal.struct_sigcontext; PROCEDURE switch_thread (<*UNUSED*> sig, code: INTEGER; <*UNUSED*> scp: SignalData) RAISES {Alerted} = BEGIN allow_sigvtalrm (); IF RT0u.inCritical = 0 THEN InternalYield () END; END switch_thread; PROCEDURE allow_sigvtalrm () = VAR i : Word.T; BEGIN i := Usignal.sigsetmask (0); i := Word.And (i, Word.Not (Usignal.sigmask (Usignal.SIGVTALRM))); EVAL Usignal.sigsetmask (i); END allow_sigvtalrm; PROCEDURE disallow_sigvtalrm () = VAR i : Word.T; BEGIN i := Usignal.sigsetmask (0); i := Word.Or (i, Usignal.sigmask (Usignal.SIGVTALRM)); EVAL Usignal.sigsetmask (i); END disallow_sigvtalrm; PROCEDURE SetSwitchingInterval (i: Time.T) = BEGIN selected_interval.tv_sec := i.seconds; selected_interval.tv_usec := i.microseconds; IF multipleThreads THEN StartSwitching () END; END SetSwitchingInterval; (*------------------------------------------------------------- scheduler ---*) PROCEDURE CanRun (t: T) = BEGIN t.state := State.alive; t.nextWaiting := NIL; t.waitingForCondition := NIL; t.waitingForMutex := NIL; IF perfOn THEN PerfChanged (t.id, State.alive); END; END CanRun; PROCEDURE ICannotRun (newState: State) = BEGIN self.state := newState; IF perfOn THEN PerfChanged (self.id, newState); END; END ICannotRun; PROCEDURE InternalYield () RAISES {Alerted} = VAR t, from, tmp : T; blockingRead : Unix.FDSet; blockingWrite : Unix.FDSet; blockingExcept : Unix.FDSet; blockingNfds := 0; somePausing := FALSE; someBlocking := FALSE; now : Time.T; earliest : Time.T; timeout : Time.T; n : INTEGER; do_alert : BOOLEAN; did_delete : BOOLEAN; BEGIN INC (RT0u.inCritical); <*ASSERT RT0u.inCritical = 1 *> from := self.next; (* remember where we started *) LOOP t := from; blockingRead := Unix.FDSet {}; blockingWrite := Unix.FDSet {}; blockingExcept := Unix.FDSet {}; blockingNfds := 0; now := Time.Now (); did_delete := FALSE; LOOP CASE t.state OF | State.waiting => IF t.alertable AND t.alertPending THEN WITH c = t.waitingForCondition DO IF c.waitingForMe = t THEN c.waitingForMe := t.nextWaiting; ELSE VAR tt := c.waitingForMe; BEGIN WHILE tt.nextWaiting # t DO tt := tt.nextWaiting; END; tt.nextWaiting := t.nextWaiting; END; END; END; CanRun (t); EXIT; END; | State.locking => <*ASSERT NOT t.alertable*> | State.pausing => IF t.alertable AND t.alertPending THEN CanRun (t); EXIT; ELSIF Time.Compare (t.waitingForTime, now) <= 0 THEN CanRun (t); EXIT; ELSIF NOT somePausing THEN earliest := t.waitingForTime; somePausing := TRUE; ELSIF Time.Compare (t.waitingForTime, earliest) < 0 THEN earliest := t.waitingForTime; END; | State.blocking => IF t.alertable AND t.alertPending THEN CanRun (t); EXIT; ELSE n := Unix.select (t.select.nfds, t.select.readFDS, t.select.writeFDS, t.select.exceptFDS, ADR (ZeroTimeout)); (* Yes, or error. Make it runnable *) IF n # 0 THEN t.select.result := n; t.select.errno := Cerrno.errno; CanRun (t); EXIT; (* No, but its timer may have expired *) ELSIF t.select.hasTimeout AND Time.Compare (t.select.timeout, now) <= 0 THEN t.select.result := 0; t.select.errno := 0; CanRun (t); EXIT; (* No. Remember what we are looking for if nobody can run *) ELSE (* restore the FDS *) t.select.readFDS^ := t.select.saveReadFDS; t.select.writeFDS^ := t.select.saveWriteFDS; t.select.exceptFDS^ := t.select.saveExceptFDS; blockingNfds := MAX (blockingNfds, t.select.nfds); blockingRead := blockingRead + t.select.saveReadFDS; blockingWrite := blockingWrite + t.select.saveWriteFDS; blockingExcept := blockingExcept + t.select.saveExceptFDS; someBlocking := TRUE; IF t.select.hasTimeout THEN IF NOT somePausing THEN earliest := t.select.timeout; somePausing := TRUE; ELSIF Time.Compare (t.select.timeout, earliest) < 0 THEN earliest := t.select.timeout; END; END; END; END; | State.dying, State.dead => (* remove this guy from the ring *) IF perfOn THEN PerfDeleted (t.id); END; IF hooks # NIL THEN hooks.die (t) END; tmp := t.previous; IF (t = from) THEN from := tmp END; t.next.previous := tmp; tmp.next := t.next; t.next := NIL; t.previous := NIL; t := tmp; did_delete := TRUE; | State.alive => EXIT; END; (* case *) t := t.next; IF t = from THEN EXIT; END; END; IF t.state = State.alive THEN IF perfOn THEN PerfRunning (t.id); END; (* At least one thread wants to run; transfer to it *) Transfer (self.context, t.context, t); do_alert := self.alertable AND self.alertPending; self.alertable := FALSE; IF do_alert THEN self.alertPending := FALSE END; DEC (RT0u.inCritical); IF do_alert THEN RAISE Alerted END; RETURN; ELSIF did_delete THEN (* run through the ring one more time before we block waiting for I/O, pause for a timer or declare deadlock. *) ELSIF somePausing THEN IF perfOn THEN PerfRunning (-1); END; timeout := Time.Subtract (earliest, now); EVAL Unix.select (blockingNfds, ADR (blockingRead), ADR (blockingWrite), ADR (blockingExcept), ADR (timeout)); ELSIF someBlocking THEN IF perfOn THEN PerfRunning (-1); END; EVAL Unix.select (blockingNfds, ADR (blockingRead), ADR (blockingWrite), ADR (blockingExcept), NIL); ELSE IF perfOn THEN PerfRunning (-1); END; DumpEverybody (); RTMisc.FatalError (NIL, 0, "Deadlock !"); END; END; END InternalYield; (*-------------------------------------------------- low-level coroutines ---*) CONST seal = 123456; TYPE Context = RECORD stack: RTStack.T; stackTop: ADDRESS; stackBottom: ADDRESS; handlers: ADDRESS; errno: INTEGER; buf: RTThread.State; END; VAR self: T; (* the currently running thread *) VAR modelFrame: UNTRACED REF ARRAY OF Word.T; modelSP : ADDRESS; modelBuf : RTThread.State; (* These two variables contain thread-dependent data; however, we cannot declare them as Modula-3 variables, because they are needed in every compilation unit, even when no Thread-related module is imported. They are declared in a small piece of the runtime, in C. It is important to note that these are NOT traced refs. *) (* the two variables are _ThreadSup__{handlers,stackLimit} and are declared in the ThreadF interface *) (* The general strategy is: - at initialization time, get an idea of what the stack frame and environment for a routine is; this is done by InitTopContext/DetermineContext. This context is stored in the "model" variables. - when a new thread is forked, its stack is initialized from the model stack, and the environment is restored after modifying the entries that depend on the stack position (eg. SP, AP, FP) running in that new context will send us in DetermineContext that will execute the thread closure (actually a shell that runs that closure). *) PROCEDURE InitTopContext (VAR c: Context) = VAR env: RTThread.State; BEGIN (* The first thread runs on the original stack, we don't want any checks *) <* ASSERT bottom_of_stack # NIL *> c.stack := NIL; c.stackTop := NIL; c.stackBottom := bottom_of_stack; c.handlers := NIL; c.errno := 0; (* determine what should go in the stack of future threads *) WITH i = RTThread.Save (env) DO <* ASSERT i = 0 *> END; DetermineContext (RTThread.SP (env)); END InitTopContext; PROCEDURE DetermineContext (oldSP: ADDRESS) = (* This routine looks at the stack frame for this call and takes it as a model for the frame to put in the stacks of forked threads. It also saves the jmp_buf at the beginning of the call in a global; that jmp_buf will be (after updating the stack pointer) for forked threads *) <*FATAL Alerted*> CONST FramePad = 2; (* additional words below oldSP to copy *) BEGIN IF (RTThread.Save (modelBuf) = 0) THEN (* first time through; this part is executed only once to determine the model *) modelSP := RTThread.SP (modelBuf); (* Copy the frame (plus pad) to modelStack and remember where that should go in the new stacks *) RTThread.FlushStackCache (); modelFrame := NEW (UNTRACED REF ARRAY OF Word.T, ABS (modelSP - oldSP) DIV ADRSIZE (Word.T) + 1 + FramePad); IF stack_grows_down = 0 THEN <* ASSERT oldSP < modelSP *> DEC (oldSP, FramePad * ADRSIZE (Word.T)); EVAL Cstring.memcpy (ADR (modelFrame [0]), LOOPHOLE (oldSP, ADDRESS), NUMBER (modelFrame^) * BYTESIZE (Word.T)); ELSE <* ASSERT oldSP > modelSP *> INC (oldSP, FramePad * ADRSIZE (Word.T)); EVAL Cstring.memcpy (ADR (modelFrame [0]), LOOPHOLE (modelSP, ADDRESS), NUMBER (modelFrame^) * BYTESIZE (Word.T)); END; ELSE (* we are starting the execution of a forked thread *) currentHandlers := self.context.handlers; currentStackLimit := self.context.stackTop; Cerrno.errno := self.context.errno; allow_sigvtalrm (); DEC (RT0u.inCritical); FloatMode.InitThread (self.floatState); self.result := self.closure.apply (); INC (RT0u.inCritical); RTStack.Dispose (self.context.stack); Broadcast (self.endCondition); ICannotRun (State.dying); INC (stats.n_dead); DEC (RT0u.inCritical); InternalYield (); <* ASSERT FALSE *> END; END DetermineContext; PROCEDURE InitContext (VAR c: Context; size: INTEGER) = VAR stack: RTStack.T; s_first, s_last: ADDRESS; offset: INTEGER; newSP: ADDRESS; BEGIN (* allocate a new stack *) stack := RTStack.New (size); RTStack.GetBounds (stack, s_first, s_last); (* initialize the context fields *) c.stack := stack; c.stackTop := s_first; c.stackBottom := s_last - ADRSIZE (Word.T); c.handlers := NIL; c.errno := Cerrno.errno; (* mark the ends of the stack for a sanity check *) LOOPHOLE (c.stackTop, IntPtr)^ := seal; LOOPHOLE (c.stackBottom, IntPtr)^ := seal; IF stack_grows_down = 0 THEN newSP := RTMisc.Align (s_first + ADRSIZE (Word.T), RTStack.StackFrameAlignment); ELSE newSP := RTMisc.Align (s_last - 2 * ADRSIZE (Word.T) - NUMBER (modelFrame^) * ADRSIZE (Word.T) - RTStack.StackFrameAlignment + 1, RTStack.StackFrameAlignment); END; offset := newSP - modelSP; EVAL Cstring.memcpy (newSP, ADR (modelFrame [0]), NUMBER (modelFrame^) * BYTESIZE (Word.T)); RTThread.UpdateFrameForNewSP (newSP, offset); c.buf := modelBuf; RTThread.UpdateStateForNewSP (c.buf, offset); END InitContext; PROCEDURE Transfer (VAR from, to: Context; new_self: T) = BEGIN <* ASSERT (from.stack = NIL) OR (LOOPHOLE (from.stackTop, IntPtr)^ = seal AND LOOPHOLE (from.stackBottom, IntPtr)^ = seal) *> <* ASSERT (to.stack = NIL) OR (LOOPHOLE (to.stackTop, IntPtr)^ = seal AND LOOPHOLE (to.stackBottom, IntPtr)^ = seal) *> IF (ADR (from) # ADR (to)) THEN disallow_sigvtalrm (); IF RTThread.Save (from.buf) = 0 THEN (* save the 'from' context and jump to 'to' *) from.handlers := currentHandlers; from.errno := Cerrno.errno; self := new_self; RTThread.Restore (to.buf, 1); ELSE (* we returned to resume 'from' *) currentHandlers := from.handlers; currentStackLimit := from.stackTop; Cerrno.errno := from.errno; END; allow_sigvtalrm (); END; END Transfer; PROCEDURE Tos (READONLY c: Context; VAR start, stop: ADDRESS) = BEGIN IF stack_grows_down = 0 THEN start := c.stackTop + ADRSIZE (Word.T); stop := RTThread.SP (c.buf); ELSE start := RTThread.SP (c.buf); stop := c.stackBottom - ADRSIZE (Word.T); END; END Tos; PROCEDURE MyFPState (): UNTRACED REF FloatMode.ThreadState = BEGIN RETURN ADR (self.floatState); END MyFPState; (*----------------------------------------------------- debugging support ---*) CONST WaitTag = ARRAY State OF TEXT { "*ready*", "condition ", "mutex ", "timer ", "I/O ", "*dying*", "*dead*" }; PROCEDURE DumpEverybody () = VAR t: T; BEGIN INC (RT0u.inCritical); SmallIO.PutText (SmallIO.stderr, "\n\n*****************************"); SmallIO.PutText (SmallIO.stderr, "**********************************\n"); SmallIO.PutText (SmallIO.stderr, " id Thread.T closure root"); SmallIO.PutText (SmallIO.stderr, " A* waiting for\n"); t := self; REPEAT IF (t = NIL) THEN SmallIO.PutText (SmallIO.stderr, "!!! NIL thread in ring !!!\n"); EXIT; END; DumpThread (t); t := t.next; UNTIL (t = self); SmallIO.PutText (SmallIO.stderr, "*****************************"); SmallIO.PutText (SmallIO.stderr, "**********************************\n"); SmallIO.Flush (SmallIO.stderr); DEC (RT0u.inCritical); END DumpEverybody; PROCEDURE DumpThread (t: T) = TYPE ClosureMethods = UNTRACED REF ARRAY [0..1] OF ADDRESS; TYPE ClosureObject = UNTRACED REF ClosureMethods; VAR pc, proc: ADDRESS; m: MUTEX; co: ClosureObject; name: RTProc.Name; len: INTEGER; BEGIN IF (t = self) THEN SmallIO.PutChar (SmallIO.stderr, '>'); ELSE SmallIO.PutChar (SmallIO.stderr, ' '); END; (* thread ID *) OutI (t.id, 3); (* Thread.T *) OutA (t, 11); (* closure *) OutA (t.closure, 11); (* inital PC *) SmallIO.PutChar (SmallIO.stderr, ' '); pc := NIL; co := LOOPHOLE (t.closure, ClosureObject); IF (co # NIL) AND (co^ # NIL) THEN pc := co^^[1] END; IF (co = NIL) THEN SmallIO.PutText (SmallIO.stderr, "*main program* "); ELSE RTProc.FromPC (pc, proc, name); IF (proc = NIL) OR (proc # pc) THEN OutA (LOOPHOLE (pc, REFANY), 19); ELSE len := Cstring.strlen (name); SmallIO.PutChars (SmallIO.stderr, name, len); Pad (19, len); END; END; (* alert status *) IF (t.alertable) THEN SmallIO.PutText (SmallIO.stderr, "A"); ELSE SmallIO.PutText (SmallIO.stderr, " "); END; IF (t.alertPending) THEN SmallIO.PutText (SmallIO.stderr, "* "); ELSE SmallIO.PutText (SmallIO.stderr, " "); END; (* state *) SmallIO.PutText (SmallIO.stderr, WaitTag [t.state]); CASE t.state OF | State.alive => (* nothing *) | State.waiting => OutA (t.waitingForCondition, 0); | State.locking => m := t.waitingForMutex; OutA (m, 0); IF (m # NIL) THEN IF (m.holder = NIL) THEN SmallIO.PutText (SmallIO.stderr, " (unlocked)"); ELSE SmallIO.PutText (SmallIO.stderr, " (held by #"); OutI (m.holder.id, 0); SmallIO.PutText (SmallIO.stderr, ")"); END; END; | State.blocking => (* nothing *) | State.pausing => (* nothing *) | State.dying => (* nothing *) | State.dead => (* nothing *) END; SmallIO.PutChar (SmallIO.stderr, '\n'); END DumpThread; PROCEDURE OutI (i: INTEGER; width: INTEGER) = <*FATAL Convert.Failed*> VAR buf : ARRAY [0..31] OF CHAR; VAR len := Convert.FromInt (buf, i); BEGIN Pad (width, len); SmallIO.PutChars (SmallIO.stderr, ADR (buf[0]), len); END OutI; PROCEDURE OutA (a: REFANY; width: INTEGER) = <*FATAL Convert.Failed*> VAR buf : ARRAY [0..31] OF CHAR; VAR len := Convert.FromInt (buf, LOOPHOLE (a, INTEGER), 16); BEGIN Pad (width, len+2); SmallIO.PutText (SmallIO.stderr, "0x"); SmallIO.PutChars (SmallIO.stderr, ADR (buf[0]), len); END OutA; VAR pad := ARRAY [0..8] OF CHAR { ' ', .. }; PROCEDURE Pad (min, used: INTEGER) = BEGIN IF (used < min) THEN SmallIO.PutChars (SmallIO.stderr, ADR (pad[0]), min - used); END; END Pad; (*------------------------------------------------------ ShowThread hooks ---*) VAR perfR, perfW: Ctypes.int; perfOn: BOOLEAN := FALSE; PROCEDURE PerfStart () = BEGIN IF LowPerfTool.ParamStartAndWait ("showthread", perfR, perfW) THEN perfOn := TRUE; EVAL RTMisc.RegisterExitor (PerfStop); END; END PerfStart; PROCEDURE PerfStop (<*UNUSED*>n: INTEGER := 0) = BEGIN (* UNSAFE, but needed to prevent deadlock if we're crashing! *) EVAL Unix.close (perfW); END PerfStop; CONST EventSize = (BITSIZE(ThreadEvent.T) + BITSIZE(CHAR) - 1) DIV BITSIZE(CHAR); TYPE TE = ThreadEvent.Kind; PROCEDURE PerfChanged (id: Id; s: State) = VAR e := ThreadEvent.T {kind := TE.Changed, id := id, state := s}; BEGIN IF Uuio.write (perfW, ADR (e), EventSize) = -1 THEN perfOn := FALSE; END; END PerfChanged; PROCEDURE PerfDeleted (id: Id) = VAR e := ThreadEvent.T {kind := TE.Deleted, id := id}; BEGIN IF Uuio.write (perfW, ADR (e), EventSize) = -1 THEN perfOn := FALSE; END; END PerfDeleted; PROCEDURE PerfRunning (id: Id) = VAR e := ThreadEvent.T {kind := TE.Running, id := id}; BEGIN IF Uuio.write (perfW, ADR (e), EventSize) = -1 THEN perfOn := FALSE; END; END PerfRunning; (*--------------------------------------------------------- ThreadF hooks ---*) VAR hooks: Hooks := NIL; PROCEDURE RegisterHooks(h: Hooks; init := TRUE): Hooks RAISES {}= VAR oldHooks: Hooks; t: T; BEGIN INC (RT0u.inCritical); oldHooks := hooks; hooks := h; IF init AND hooks # NIL THEN t := self; REPEAT hooks.fork (t); t := t.next; UNTIL (t = self); END; DEC (RT0u.inCritical); RETURN oldHooks; END RegisterHooks; PROCEDURE MyId(): Id RAISES {}= BEGIN RETURN self.id; END MyId; (*-------------------------------------------------------- initialization ---*) BEGIN RT0u.inCritical := 1; topThread := NEW (T, state := State.alive, id := nextId); FloatMode.InitThread (topThread.floatState); INC (nextId); InitTopContext (topThread.context); self := topThread; pausedThreads := NIL; topThread.next := topThread; topThread.previous := topThread; RT0u.inCritical := 0; PerfStart (); (*preemption := NOT RTParams.IsPresent ("nopreemption");*) preemption:=FALSE; END Thread.