(* Copyright (C) 1992, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Wed Jul 1 21:19:07 1992 by rustan *) (* Rustan Leino *) UNSAFE MODULE IPC; IMPORT Word, RPC, System, Thread, ThreadF, RT0, RT0u, RTHeap, Processor; FROM System IMPORT EnterSystemCritical, ExitSystemCritical; (* ---------------------------- types -------------------------- *) TYPE SendNode = OBJECT next: SendNode := NIL; dest: INTEGER; ack: Thread.T := NIL; msgFirst: ADDRESS; msgLast: ADDRESS END; AddressReply = SendNode OBJECT header: Header; addr: ADDRESS END; ReceiveNode = REF RECORD next: ReceiveNode := NIL; t: Thread.T := NIL; bufFirst: ADDRESS; bufLast: ADDRESS END; (* -------------------------- variables ------------------------ *) VAR ReceiveBuffer: Header; DecRefQueue: Surrogate := NIL; SendQueue: SendNode := NIL; availableSendNodes: AddressReply := NIL; availableReceiveNodes: ReceiveNode := NIL; currSendNode: SendNode := NIL; DecRefSendBuffer: Header := Header{ type := MsgType.DecRef, src := GlobalID{ 0, 0 }, param := 0 }; Receivers: ReceiveNode := NIL; (* -------------------------- constants ------------------------ *) CONST ISR_Receive = 1; ISR_Buffer = 4; ISR_Send = 2; ISR_RecBuf = Word.Or( ISR_Receive, ISR_Buffer ); ISR_Used = Word.Or( ISR_RecBuf, ISR_Send ); ISR_Unused = Word.Not( ISR_Used ); (* ---------------------------- code --------------------------- *) PROCEDURE Interrupt( isr: Word.T ) = (* interrupt handler; intended to be called in the system sequence only *) VAR receivedAll: BOOLEAN; BEGIN LOOP <* ASSERT Word.And( isr, ISR_Used ) # 0 AND Word.And( isr, ISR_Unused ) = 0 *> IF Word.And( isr, ISR_Send ) # 0 THEN System.EnterSystemCritical(); DoSendAftermath(); System.ExitSystemCritical() END; IF Word.And( isr, ISR_RecBuf ) # 0 THEN (* check that the received type is really a MsgType value *) EVAL VAL( ORD( ReceiveBuffer.type ), MsgType ); WITH rb = Word.And( isr, ISR_RecBuf ) DO IF rb = ISR_RecBuf THEN receivedAll := TRUE ELSIF rb = ISR_Buffer THEN receivedAll := FALSE ELSE <* ASSERT FALSE *> END END; CASE ReceiveBuffer.type OF MsgType.NewCall => VAR body: REF ARRAY OF INTEGER; bodySize := ReceiveBuffer.param; BEGIN <* ASSERT bodySize > 0 AND NOT receivedAll *> body := NEW( REF ARRAY OF INTEGER, bodySize ); ReceiveBody( ADR( body[0] ), ADR( body[ bodySize-1 ] )); IF System.RPCThread = NIL THEN EVAL Thread.Fork( NEW( RPC.NewClosure, client := ReceiveBuffer.src, callDescr := body )) ELSE <* ASSERT System.RPCClosure # NIL *> System.RPCClosure.client := ReceiveBuffer.src; System.RPCClosure.callDescr := body; System.EnterSystemCritical(); ThreadF.Schedule( System.RPCThread ); System.RPCThread := NIL; System.ExitSystemCritical() END END | MsgType.Reply => VAR p: ReceiveNode := Receivers; back: ReceiveNode := NIL; receivingThread := LOOPHOLE( ReceiveBuffer.param, Thread.T ); BEGIN WHILE p # NIL AND p.t # receivingThread DO back := p; p := p.next END; <* ASSERT p # NIL *> IF back = NIL THEN Receivers := p.next ELSE back.next := p.next END; IF NOT receivedAll THEN ReceiveBody( p.bufFirst, p.bufLast ) END; EnterSystemCritical(); ThreadF.Schedule( p.t ); p.next := availableReceiveNodes; availableReceiveNodes := p; ExitSystemCritical() END | MsgType.IncRef => <* ASSERT receivedAll *> VAR netobj: NETWORK := LOOPHOLE( ReceiveBuffer.param, ROOT ); reply: AddressReply := NewSendNode(); BEGIN INC( netobj.refcount ); reply.header.type := MsgType.Reply; reply.header.param := ReceiveBuffer.src.id; reply.addr := RT0u.types[ TYPECODE( netobj ) ]; reply.dest := ReceiveBuffer.src.pid; reply.msgFirst := ADR( reply.header ); reply.msgLast := ADR( reply.addr ); EnterSystemCritical(); EVAL Send( reply ); ExitSystemCritical() END | MsgType.DecRef => <* ASSERT receivedAll *> VAR netobj: NETWORK := LOOPHOLE( ReceiveBuffer.param, ROOT ); BEGIN DEC( netobj.refcount ) END | MsgType.New => VAR def := LOOPHOLE( ReceiveBuffer.param, RT0.TypeDefinition ); newNetworkObject: NETWORK; BEGIN <* ASSERT def.remoteMethods # NIL *> newNetworkObject := LOOPHOLE( RTHeap.Allocate( def ), NETWORK ); IF NOT receivedAll THEN ReceiveBody( LOOPHOLE( newNetworkObject, ADDRESS ), LOOPHOLE( newNetworkObject, ADDRESS ) + def.dataSize - ADRSIZE( ADDRESS )) END; newNetworkObject.pid := Processor.ID(); newNetworkObject.refcount := 1; VAR reply: AddressReply := NewSendNode(); BEGIN reply.header.type := MsgType.Reply; reply.header.param := ReceiveBuffer.src.id; reply.addr := LOOPHOLE( newNetworkObject, ADDRESS ); reply.dest := ReceiveBuffer.src.pid; reply.msgFirst := ADR( reply.header ); reply.msgLast := ADR( reply.addr ); EnterSystemCritical(); EVAL Send( reply ); ExitSystemCritical() END END END; (* prepare to receive next message *) System.PrepareToReceive( ADR( ReceiveBuffer ), ADR( ReceiveBuffer ) + ADRSIZE( ReceiveBuffer ) - ADRSIZE( ADDRESS )); System.AcknowledgeInterrupt( ISR_RecBuf ) END; isr := System.InterruptStatus(); IF isr = 0 THEN RETURN END END END Interrupt; PROCEDURE ReceiveBody( first, last: ADDRESS ) = VAR isr: Word.T; BEGIN System.PrepareToReceive( first, last ); System.AcknowledgeInterrupt( ISR_Buffer ); LOOP isr := Word.And( System.InterruptStatus(), ISR_RecBuf ); IF isr # 0 THEN <* ASSERT isr = ISR_RecBuf *> RETURN END END END ReceiveBody; PROCEDURE DoSendAftermath() = (* REQUIRES inSystemCritical *) (* REQUIRES DecRefQueue # NIL ==> Thread.Self() # ThreadF.gcThread *) (* REQUIRES Word.And( System.InterruptStatus(), ISR_Send ) # 0 AND sendInProgress *) BEGIN <* ASSERT sendInProgress *> IF currSendNode # NIL THEN IF currSendNode.ack # NIL THEN ThreadF.Schedule( currSendNode.ack ) END; currSendNode.next := availableSendNodes; availableSendNodes := currSendNode; currSendNode := NIL END; sendInProgress := FALSE; System.AcknowledgeInterrupt( ISR_Send ); IF DecRefQueue # NIL THEN <* ASSERT DecRefSendBuffer.type = MsgType.DecRef *> DecRefSendBuffer.param := DecRefQueue.gid.id; InitiateSend( DecRefQueue.gid.pid, ADR( DecRefSendBuffer ), ADR( DecRefSendBuffer ) + ADRSIZE( DecRefSendBuffer ) - ADRSIZE( ADDRESS )); VAR p: ADDRESS := DecRefQueue; BEGIN DecRefQueue := DecRefQueue.next; RTHeap.BackdoorDisposeUntraced( p ) END ELSIF SendQueue # NIL THEN InitiateSend( SendQueue.dest, SendQueue.msgFirst, SendQueue.msgLast ); <* ASSERT currSendNode = NIL *> currSendNode := SendQueue; SendQueue := SendQueue.next; currSendNode.next := NIL END END DoSendAftermath; PROCEDURE EnqueueSurrogate( sur: Surrogate ): BOOLEAN = (* REQUIRES inSystemCritical *) (* To be called by the garbage collector only *) (* Adds the given surrogate object to DecRefQueue. Returns TRUE if caller should call DISPOSE on sur, and FALSE if caller should not use the reference any more. In either case, it is assumed no other references to the sur exist. *) (* Note that DoSendAftermath cannot be called if DecRefQueue # NIL *) BEGIN IF sendInProgress THEN IF DecRefQueue # NIL THEN WHILE Word.And( System.InterruptStatus(), ISR_Send ) # 0 DO DoSendAftermath() END END; IF sendInProgress THEN sur.next := DecRefQueue; DecRefQueue := sur; RETURN FALSE END END; <* ASSERT DecRefSendBuffer.type = MsgType.DecRef *> DecRefSendBuffer.param := sur.gid.id; InitiateSend( sur.gid.pid, ADR( DecRefSendBuffer ), ADR( DecRefSendBuffer ) + ADRSIZE( DecRefSendBuffer ) - ADRSIZE( ADDRESS )); IF Word.And( System.InterruptStatus(), ISR_Send ) # 0 THEN DoSendAftermath() END; RETURN TRUE END EnqueueSurrogate; PROCEDURE NewSendNode(): AddressReply = (* REQUIRES NOT System.inSystemCritical *) VAR sn: AddressReply := NIL; BEGIN EnterSystemCritical(); IF availableSendNodes # NIL THEN sn := availableSendNodes; availableSendNodes := sn.next; sn.next := NIL; sn.ack := NIL END; ExitSystemCritical(); IF sn # NIL THEN RETURN sn END; RETURN NEW( AddressReply ) END NewSendNode; PROCEDURE NewReceiveNode(): ReceiveNode = (* REQUIRES NOT System.inSystemCritical *) VAR rn: ReceiveNode := NIL; BEGIN EnterSystemCritical(); IF availableReceiveNodes # NIL THEN rn := availableReceiveNodes; availableReceiveNodes := rn.next; rn.next := NIL; rn.t := NIL END; ExitSystemCritical(); IF rn # NIL THEN RETURN rn END; RETURN NEW( ReceiveNode ) END NewReceiveNode; PROCEDURE EmptyAvailLists() = BEGIN availableSendNodes := NIL; availableReceiveNodes := NIL END EmptyAvailLists; PROCEDURE SendBuffer( destPid: INTEGER; bufFirst, bufLast: ADDRESS ) = (* this procedure is exported *) (* REQUIRES NOT inSystemCritical *) (* REQUIRES System.initialized *) VAR sn: SendNode := NewSendNode(); BEGIN sn.dest := destPid; sn.msgFirst := bufFirst; sn.msgLast := bufLast; EnterSystemCritical(); IF NOT Send( sn ) THEN ThreadF.Suspend( sn.ack ) END; ExitSystemCritical() END SendBuffer; PROCEDURE SendAndReceiveBuffer( destPid: INTEGER; sendFirst, sendLast: ADDRESS; recFirst, recLast: ADDRESS ) = (* this procedure is exported *) (* REQUIRES NOT inSystemCritical *) (* REQUIRES System.initialized *) VAR rn: ReceiveNode := NewReceiveNode(); sn: SendNode := NewSendNode(); BEGIN sn.dest := destPid; sn.msgFirst := sendFirst; sn.msgLast := sendLast; rn.bufFirst := recFirst; rn.bufLast := recLast; <* ASSERT System.Initialized() *> EnterSystemCritical(); rn.next := Receivers; Receivers := rn; EVAL Send( sn ); ThreadF.Suspend( rn.t ); ExitSystemCritical() END SendAndReceiveBuffer; PROCEDURE Send( sn: SendNode ): BOOLEAN = (* REQUIRES inSystemCritical *) (* REQUIRES System.initialized AND sn.next = NIL *) (* Returns TRUE if the send has been completed by the time this procedure returns. *) BEGIN <* ASSERT System.Initialized() *> <* ASSERT sn.next = NIL *> IF sendInProgress THEN VAR p: SendNode := SendQueue; back: SendNode := NIL; BEGIN WHILE p # NIL DO back := p; p := p.next END; IF back = NIL THEN SendQueue := sn ELSE back.next := sn END END ELSE <* ASSERT SendQueue = NIL AND DecRefQueue = NIL AND currSendNode = NIL *> currSendNode := sn; InitiateSend( sn.dest, sn.msgFirst, sn.msgLast ) END; WHILE Word.And( System.InterruptStatus(), ISR_Send ) # 0 DO DoSendAftermath() END; RETURN NOT sendInProgress END Send; PROCEDURE InitiateSend( destPid: INTEGER; msgFirst, msgLast: ADDRESS ) = (* REQUIRES inSystemCritical AND NOT sendInProgress *) (* ENSURES sendInProgress *) BEGIN <* ASSERT NOT sendInProgress *> System.InitiateSend( msgFirst, msgLast, Processor.PidToDxdy( destPid )); sendInProgress := TRUE END InitiateSend; PROCEDURE Initialize() = (* REQUIRES NOT System.initialized *) (* ENSURES System.initialized *) BEGIN System.Initialize( ADR( ReceiveBuffer ), ADR( ReceiveBuffer ) + ADRSIZE( ReceiveBuffer ) - ADRSIZE( ADDRESS )) END Initialize; BEGIN END IPC.