MODULE primes EXPORTS Main; IMPORT SmallIO, Thread, Processor; CONST Max = 30000; TYPE Buffer = OBJECT mu: MUTEX; notEmpty: Thread.Condition; notFull: Thread.Condition; a: REF ARRAY OF INTEGER; n: CARDINAL := 0; nextGet: CARDINAL := 0; nextPut: CARDINAL := 0 METHODS init( size: [1..LAST(INTEGER)] := 1 ): Buffer := BufInit; put( x: INTEGER ) := Put; get(): INTEGER := Get END; PROCEDURE BufInit( buf: Buffer; size: [1..LAST(INTEGER)] ): Buffer = BEGIN buf.mu := NEW( MUTEX ); buf.notEmpty := NEW( Thread.Condition ); buf.notFull := NEW( Thread.Condition ); buf.a := NEW( REF ARRAY OF INTEGER, size ); RETURN buf END BufInit; PROCEDURE Put( buf: Buffer; x: INTEGER ) = BEGIN LOCK buf.mu DO WITH size = NUMBER( buf.a^ ) DO WHILE buf.n = size DO Thread.Wait( buf.mu, buf.notFull ) END; buf.a[ buf.nextPut ] := x; buf.nextPut := ( buf.nextPut + 1 ) MOD size; INC( buf.n ); Thread.Signal( buf.notEmpty ) END END END Put; PROCEDURE Get( buf: Buffer ): INTEGER = VAR x: INTEGER; BEGIN LOCK buf.mu DO WHILE buf.n = 0 DO Thread.Wait( buf.mu, buf.notEmpty ) END; x := buf.a[ buf.nextGet ]; buf.nextGet := ( buf.nextGet + 1 ) MOD NUMBER( buf.a^ ); DEC( buf.n ); Thread.Signal( buf.notFull ) END; RETURN x END Get; PROCEDURE ProcFromID( id: INTEGER ): INTEGER = BEGIN RETURN Processor.Min() + id MOD Processor.Count() END ProcFromID; TYPE PrimeConsumer = NETWORK OBJECT primeCount: INTEGER := 0; METHODS consume( x: INTEGER ): INTEGER := Consume; end() END; PROCEDURE Consume( p: PrimeConsumer; x: INTEGER ): INTEGER = VAR id := p.primeCount; BEGIN SmallIO.PutInt( x ); SmallIO.PutChar( '\n' ); INC( p.primeCount ); RETURN id END Consume; TYPE PrmCons = PrimeConsumer OBJECT done: BOOLEAN := FALSE; mu: MUTEX; c: Thread.Condition METHODS wait() := Wait OVERRIDES end := EndPrmCons END; PROCEDURE EndPrmCons( p: PrmCons ) = BEGIN LOCK p.mu DO p.done := TRUE; Thread.Signal( p.c ) END END EndPrmCons; PROCEDURE Wait( p: PrmCons ) = BEGIN LOCK p.mu DO WHILE NOT p.done DO Thread.Wait( p.mu, p.c ) END END; SmallIO.PutText( "A total of " ); SmallIO.PutInt( p.primeCount ); SmallIO.PutText( " primes\n" ) END Wait; TYPE Filter = NETWORK OBJECT cl: FilterClosure METHODS init( n: INTEGER; p: PrimeConsumer ): Filter := Init; try( x: INTEGER ) := Try; end() := End END; FilterClosure = Thread.Closure OBJECT buf: Buffer; n: INTEGER; next: Filter := NIL; p: PrimeConsumer; id: INTEGER OVERRIDES apply := Apply END; PROCEDURE Init( f: Filter; n: INTEGER; p: PrimeConsumer ): Filter = BEGIN f.cl := NEW( FilterClosure, buf := NEW( Buffer ).init(), n := n, p := p ); f.cl.id := p.consume( n ); EVAL Thread.Fork( f.cl ); (* this creates a thread executing f.cl.apply() *) RETURN f END Init; PROCEDURE Try( f: Filter; x: INTEGER ) = BEGIN f.cl.buf.put( x ) END Try; PROCEDURE End( f: Filter ) = BEGIN f.cl.buf.put( 0 ) END End; PROCEDURE Apply( cl: FilterClosure ): REFANY = VAR x: INTEGER; BEGIN LOOP x := cl.buf.get(); IF x = 0 THEN EXIT END; IF x MOD cl.n = 0 THEN (* skip *) ELSIF cl.next # NIL THEN cl.next.try( x ) ELSIF cl.n * cl.n >= Max THEN EVAL cl.p.consume( x ) ELSE cl.next := NEW( Filter, ProcFromID( cl.id+1 )).init( x, cl.p ) END END; IF cl.next # NIL THEN cl.next.end() ELSE SmallIO.PutText( "Used " ); SmallIO.PutInt( cl.id+1 ); SmallIO.PutText( " Filter objects\n" ); cl.p.end() END; RETURN NIL END Apply; TYPE G = OBJECT METHODS generate( p: PrimeConsumer ) := Generate END; PROCEDURE Generate( <* UNUSED *> g: G; p: PrimeConsumer ) = VAR f := NEW( Filter, ProcFromID( 0 )).init( 2, p ); BEGIN FOR x := 3 TO Max DO f.try( x ) END; f.end() END Generate; BEGIN WITH p = NEW( PrmCons, mu := NEW( MUTEX ), c := NEW( Thread.Condition )) DO NEW( G ).generate( p ); p.wait() END END primes.