(* Copyright (C) 1989, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Sat Jun 27 15:10:10 PDT 1992 by muller *) (* modified on Wed Nov 6 10:47:51 PST 1991 by kalsow *) (* modified on Thu Nov 2 18:18:07 1989 by gnelson *) (* modified on Thu Nov 2 18:18:07 1989 by manasse *) MODULE AutoFlushWr; IMPORT Wr, WrClass, Time, Thread; FROM Wr IMPORT Failure; FROM Thread IMPORT Alerted; (* A writer wr is *inactive* if wr.cur=wr.lo, and *active* otherwise. Notice that if wr is inactive, c(wr) = target(wr) (by WrClass.V2) and therefore can be ignored by the worker thread that does the flushing. Our strategy is to keep a list containing all active writers. The worker thread wakes up periodically and checks all the writers on the list, flushing them if they have gone unflushed for too long. The delay between wake ups is the equal to minimum delay of any active writer. When the list is empty, the worker blocks. *) CONST Forever = Time.T{seconds := LAST(INTEGER), microseconds := 999999}; Default = Time.T{seconds := 0, microseconds := 100 * 1000}; VAR mu := NEW (Thread.Mutex); (* mu protects the next four variables *) (* LL(wr) < LL(mu) for all wr: T *) active := NEW(T); minDelay: Time.T := Forever; c := NEW (Thread.Condition); (* that active is nonempty *) worker: Thread.T := NIL; (* The comment LL(wr) < LL(mu) means that the locking level of mu is greater than the locking level of any auto-flush writer. This means that a thread must not try to acquire a writer lock while it holds mu. *) (* The first object on the active list is a sentinal; thus the list is empty if active.link = NIL. The value of minDelay is the minimum delay of any writer on the active list. *) REVEAL T = Wr.T BRANDED OBJECT child : Wr.T; delay : Time.T; alarm : Time.T; (* readonly if onQ is true *) onQ : BOOLEAN; (* protected by mu *) link : T; (* protected by mu *) OVERRIDES seek := Seek; length := Length; flush := Flush; close := Close END; (* For wr of type T, wr.child is the writer to which wr's writing is forwarded. wr.delay is the inverse of the frequency with which wr will be flushed. If wr is active, wr.alarm is the next time at which wr should be flushed; otherwise wr.alarm is irrelevant. The boolean wr.onQ is true iff wr is on the active list, and wr.link is the link field for the active list. The child, delay, and alarm fields are protected by wr's mutex; the onQ and link fields are protected by the global mu. *) PROCEDURE New(ch: Wr.T; p := -1): T = VAR res: T; BEGIN TRY WrClass.Lock(ch); res := NEW(T, buff := NIL, st := 0, cur := ch.cur, lo := ch.cur, hi := ch.cur, closed := ch.closed, seekable := ch.seekable, buffered := TRUE, child := ch, onQ := FALSE, link := NIL) FINALLY WrClass.Unlock(ch) END; IF p = -1 THEN res.delay := Default ELSE res.delay.seconds := p DIV 1000; res.delay.microseconds := 1000 * (p MOD 1000) END; RETURN res END New; CONST BuffSize = 500; PROCEDURE Seek(wr: T; n: CARDINAL) RAISES {Failure, Alerted} = VAR wasEmpty := FALSE; BEGIN wr.flush(); IF wr.buff = NIL THEN wr.buff := NEW(REF ARRAY OF CHAR, BuffSize) END; wr.cur := n; wr.lo := n; wr.hi := n + BuffSize - 1; LOCK mu DO IF NOT wr.onQ THEN wr.onQ := TRUE; wasEmpty := active.link = NIL; wr.link := active.link; active.link := wr; wr.alarm := Time.Add(Time.Now(), wr.delay); IF wasEmpty THEN minDelay := wr.delay; IF worker = NIL THEN worker := Thread.Fork(NEW(Thread.Closure, apply := Worker)) END ELSIF Time.Compare(minDelay, wr.delay) > 0 THEN minDelay := wr.delay; Thread.Alert(worker) END END END; IF wasEmpty THEN Thread.Signal(c) END END Seek; PROCEDURE Flush(wr: T) RAISES {Failure, Alerted} = BEGIN IF wr.cur > wr.lo THEN Wr.PutString(wr.child, SUBARRAY(wr.buff^, wr.st, wr.cur - wr.lo)); Wr.Flush(wr.child); wr.lo := wr.cur; wr.hi := wr.cur END END Flush; PROCEDURE Length(wr: T): CARDINAL RAISES {Failure, Alerted} = BEGIN RETURN MAX (Wr.Length (wr.child), wr.cur) END Length; PROCEDURE Close(wr: T) RAISES {Failure, Alerted} = BEGIN Wr.Close(wr.child); wr.buff := NIL END Close; (* The worker thread delay is the minimum delay of any active writer. Each time it wakes up, it executes the following: | Let S be the set of active writers. | Set minDelay := infinity; | FOR wr IN S do: | IF wr's timer has expired THEN | Flush wr, remove it from the active list | ELSE | minDelay := min(minDelay, wr.delay) | END | END Notice that the active list can be enlarged while S is being processed, since clients may be operating on writers concurrently. If minDelay decreases while the worker is asleep, it is alerted. *) PROCEDURE Worker (<*UNUSED*>cl: Thread.Closure): REFANY RAISES {} = <*FATAL Wr.Failure, Thread.Alerted*> VAR delay, now: Time.T; s: T; wr: T; BEGIN LOOP LOCK mu DO WHILE active.link = NIL DO Thread.Wait(mu, c) END; delay := minDelay END; TRY Time.Pause(delay.seconds * 1000000 + delay.microseconds) EXCEPT Thread.Alerted => (*skip*) END; LOCK mu DO s := active; minDelay := Forever; END; now := Time.Now(); LOOP (* s scans through the set called S above. *) LOCK mu DO IF s.link = NIL THEN EXIT END; wr := s.link; IF Time.Compare(now, wr.alarm) > 0 THEN s.link := wr.link; wr.onQ := FALSE; wr.link := NIL ELSE s := wr; wr := NIL; IF Time.Compare(minDelay, s.delay) > 0 THEN minDelay := s.delay END END END; IF wr # NIL THEN Wr.Flush(wr) END END END END Worker; (* Notice that Wr.Flush(wr) cannot be called while mu is locked, since this violates the locking level order. Thus wr is taken out of the lock to be flushed. *) BEGIN END AutoFlushWr.