(* Copyright (C) 1992, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Wed Jul 1 21:19:07 1992 by rustan *) (* Rustan Leino *) UNSAFE MODULE IPC; IMPORT Word, RPC, System, Thread, ThreadF, RT0, RT0u, RTHeap, Processor; FROM System IMPORT EnterSystemCritical, ExitSystemCritical; (* ---------------------------- types -------------------------- *) TYPE SendNode = OBJECT next: SendNode := NIL; dest: INTEGER; ack: Thread.T := NIL; msgFirst: ADDRESS; msgLast: ADDRESS END; AddressReply = SendNode OBJECT header: Header; addr: ADDRESS END; ReceiveNode = REF RECORD next: ReceiveNode := NIL; t: Thread.T := NIL; bufFirst: ADDRESS; bufLast: ADDRESS END; (* -------------------------- variables ------------------------ *) VAR ReceiveBuffer: Header; receiveBody: BOOLEAN; (* FALSE if receiving header; TRUE if receiving body *) DecRefQueue: Surrogate := NIL; SendQueue: SendNode := NIL; availableSendNodes: AddressReply := NIL; availableReceiveNodes: ReceiveNode := NIL; currSendNode: SendNode := NIL; DecRefSendBuffer: Header := Header{ type := MsgType.DecRef, src := GlobalID{ 0, 0 }, param := 0 }; body: REF ARRAY OF INTEGER := NIL; newNetworkObject: NETWORK := NIL; Receivers: ReceiveNode := NIL; (* -------------------------- constants ------------------------ *) CONST ISR_Receive = 1; ISR_Buffer = 4; ISR_Send = 2; ISR_RecBuf = Word.Or( ISR_Receive, ISR_Buffer ); ISR_Used = Word.Or( ISR_RecBuf, ISR_Send ); ISR_Unused = Word.Not( ISR_Used ); (* ---------------------------- code --------------------------- *) PROCEDURE Interrupt( isr: Word.T ): Word.T = (* interrupt handler; intended to be called in the system sequence only *) VAR recFirst, recLast: ADDRESS; done: BOOLEAN := FALSE; receivedAll: BOOLEAN; BEGIN <* ASSERT Word.And( isr, ISR_Used ) # 0 AND Word.And( isr, ISR_Unused ) = 0 *> IF Word.And( isr, ISR_Send ) # 0 THEN <* ASSERT sendInProgress *> IF currSendNode # NIL THEN EnterSystemCritical(); IF currSendNode.ack # NIL THEN ThreadF.Schedule( currSendNode.ack ) END; currSendNode.next := availableSendNodes; availableSendNodes := currSendNode; currSendNode := NIL; ExitSystemCritical(); END; sendInProgress := FALSE; System.AcknowledgeSend(); isr := Word.And( isr, Word.Not( ISR_Send )); IF DecRefQueue # NIL THEN <* ASSERT DecRefSendBuffer.type = MsgType.DecRef *> DecRefSendBuffer.param := DecRefQueue.gid.id; EnterSystemCritical(); InitiateSend( DecRefQueue.gid.pid, ADR( DecRefSendBuffer ), ADR( DecRefSendBuffer ) + ADRSIZE( DecRefSendBuffer ) - ADRSIZE( ADDRESS )); ExitSystemCritical(); VAR p := DecRefQueue; BEGIN DecRefQueue := DecRefQueue.next; DISPOSE( p ) END ELSIF SendQueue # NIL THEN EnterSystemCritical(); InitiateSend( SendQueue.dest, SendQueue.msgFirst, SendQueue.msgLast ); ExitSystemCritical(); <* ASSERT currSendNode = NIL *> currSendNode := SendQueue; SendQueue := SendQueue.next; currSendNode.next := NIL END END; IF Word.And( isr, ISR_RecBuf ) # 0 THEN (* check that the received type is really a MsgType value *) EVAL VAL( ORD( ReceiveBuffer.type ), MsgType ); IF Word.And( isr, ISR_RecBuf ) = ISR_RecBuf THEN receivedAll := TRUE ELSIF Word.And( isr, ISR_RecBuf ) = ISR_Buffer THEN receivedAll := FALSE ELSE <* ASSERT FALSE *> END; recFirst := ADR( ReceiveBuffer ); recLast := ADR( ReceiveBuffer ) + ADRSIZE( ReceiveBuffer ) - ADRSIZE( ADDRESS ); CASE ReceiveBuffer.type OF MsgType.NewCall => IF NOT receiveBody THEN <* ASSERT ReceiveBuffer.param > 0 AND NOT receivedAll AND body = NIL *> body := NEW( REF ARRAY OF INTEGER, ReceiveBuffer.param ); recFirst := ADR( body[0] ); recLast := ADR( body[ ReceiveBuffer.param-1 ] ); receiveBody := TRUE ELSE <* ASSERT receivedAll AND body # NIL AND NUMBER( body^ ) > 0 *> IF System.RPCThread = NIL THEN EVAL Thread.Fork( NEW( RPC.NewClosure, client := ReceiveBuffer.src, callDescr := body )) ELSE <* ASSERT System.RPCClosure # NIL *> System.RPCClosure.client := ReceiveBuffer.src; System.RPCClosure.callDescr := body; System.EnterSystemCritical(); ThreadF.Schedule( System.RPCThread ); System.RPCThread := NIL; System.ExitSystemCritical() END; body := NIL; receiveBody := FALSE END | MsgType.Reply => IF NOT receiveBody THEN VAR p: ReceiveNode := Receivers; back: ReceiveNode := NIL; receivingThread := LOOPHOLE( ReceiveBuffer.param, Thread.T ); BEGIN WHILE p # NIL AND p.t # receivingThread DO back := p; p := p.next END; <* ASSERT p # NIL *> IF back # NIL THEN back.next := p.next; p.next := Receivers; Receivers := p END (* p is now the first node on Receivers *) END; IF receivedAll THEN done := TRUE ELSE recFirst := Receivers.bufFirst; recLast := Receivers.bufLast; receiveBody := TRUE END ELSE <* ASSERT receivedAll *> done := TRUE END; IF done THEN VAR p := Receivers; BEGIN Receivers := Receivers.next; EnterSystemCritical(); ThreadF.Schedule( p.t ); p.next := availableReceiveNodes; availableReceiveNodes := p; ExitSystemCritical() END; receiveBody := FALSE END | MsgType.IncRef => <* ASSERT NOT receiveBody AND receivedAll *> VAR netobj: NETWORK := LOOPHOLE( ReceiveBuffer.param, ROOT ); reply: AddressReply := NewSendNode(); BEGIN INC( netobj.refcount ); reply.header.type := MsgType.Reply; reply.header.param := ReceiveBuffer.src.id; reply.addr := RT0u.types[ TYPECODE( netobj ) ]; reply.dest := ReceiveBuffer.src.pid; reply.msgFirst := ADR( reply.header ); reply.msgLast := ADR( reply.addr ); EnterSystemCritical(); Send( reply ); ExitSystemCritical() END | MsgType.DecRef => <* ASSERT NOT receiveBody AND receivedAll *> VAR netobj: NETWORK := LOOPHOLE( ReceiveBuffer.param, ROOT ); BEGIN DEC( netobj.refcount ) END | MsgType.New => VAR def := LOOPHOLE( ReceiveBuffer.param, RT0.TypeDefinition ); BEGIN <* ASSERT def.remoteMethods # NIL *> IF NOT receiveBody THEN <* ASSERT newNetworkObject = NIL *> newNetworkObject := LOOPHOLE( RTHeap.Allocate( def ), NETWORK ); IF receivedAll THEN done := TRUE ELSE recFirst := LOOPHOLE( newNetworkObject, ADDRESS ); recLast := LOOPHOLE( newNetworkObject, ADDRESS ) + def.dataSize - ADRSIZE( ADDRESS ); receiveBody := TRUE END ELSE <* ASSERT receivedAll *> done := TRUE END; IF done THEN newNetworkObject.pid := Processor.ID(); newNetworkObject.refcount := 1; VAR reply: AddressReply := NewSendNode(); BEGIN reply.header.type := MsgType.Reply; reply.header.param := ReceiveBuffer.src.id; reply.addr := LOOPHOLE( newNetworkObject, ADDRESS ); reply.dest := ReceiveBuffer.src.pid; reply.msgFirst := ADR( reply.header ); reply.msgLast := ADR( reply.addr ); EnterSystemCritical(); Send( reply ); ExitSystemCritical() END; newNetworkObject := NIL; receiveBody := FALSE END END END; (* receive next (part of) message *) System.PrepareToReceive( recFirst, recLast ) END; RETURN isr END Interrupt; PROCEDURE EnqueueSurrogate( sur: Surrogate ): BOOLEAN = (* REQUIRES inSystemCritical *) BEGIN IF sendInProgress THEN sur.next := DecRefQueue; DecRefQueue := sur; RETURN FALSE ELSE <* ASSERT DecRefSendBuffer.type = MsgType.DecRef *> DecRefSendBuffer.param := sur.gid.id; InitiateSend( sur.gid.pid, ADR( DecRefSendBuffer ), ADR( DecRefSendBuffer ) + ADRSIZE( DecRefSendBuffer ) - ADRSIZE( ADDRESS )); RETURN TRUE END END EnqueueSurrogate; PROCEDURE NewSendNode(): AddressReply = (* REQUIRES NOT System.inSystemCritical *) VAR sn: AddressReply := NIL; BEGIN EnterSystemCritical(); IF availableSendNodes # NIL THEN sn := availableSendNodes; availableSendNodes := sn.next; sn.next := NIL; sn.ack := NIL END; ExitSystemCritical(); IF sn # NIL THEN RETURN sn END; RETURN NEW( AddressReply ) END NewSendNode; PROCEDURE NewReceiveNode(): ReceiveNode = (* REQUIRES NOT System.inSystemCritical *) VAR rn: ReceiveNode := NIL; BEGIN EnterSystemCritical(); IF availableReceiveNodes # NIL THEN rn := availableReceiveNodes; availableReceiveNodes := rn.next; rn.next := NIL; rn.t := NIL END; ExitSystemCritical(); IF rn # NIL THEN RETURN rn END; RETURN NEW( ReceiveNode ) END NewReceiveNode; PROCEDURE EmptyAvailLists() = BEGIN availableSendNodes := NIL; availableReceiveNodes := NIL END EmptyAvailLists; PROCEDURE SendBuffer( destPid: INTEGER; bufFirst, bufLast: ADDRESS ) = (* this procedure is exported *) (* REQUIRES System.initialized *) VAR sn: SendNode := NewSendNode(); oldSendInProgress: BOOLEAN; BEGIN sn.dest := destPid; sn.msgFirst := bufFirst; sn.msgLast := bufLast; EnterSystemCritical(); oldSendInProgress := sendInProgress; Send( sn ); IF oldSendInProgress OR Word.And( System.GetInterruptStatus(), ISR_Send ) = 0 THEN ThreadF.Suspend( sn.ack ) END; ExitSystemCritical() END SendBuffer; PROCEDURE SendAndReceiveBuffer( destPid: INTEGER; sendFirst, sendLast: ADDRESS; recFirst, recLast: ADDRESS ) = (* this procedure is exported *) (* only for sending MsgType.Reply messages *) (* REQUIRES System.initialized *) VAR rn: ReceiveNode := NewReceiveNode(); sn: SendNode := NewSendNode(); BEGIN sn.dest := destPid; sn.msgFirst := sendFirst; sn.msgLast := sendLast; rn.bufFirst := recFirst; rn.bufLast := recLast; <* ASSERT System.Initialized() *> EnterSystemCritical(); rn.next := Receivers; Receivers := rn; Send( sn ); ThreadF.Suspend( rn.t ); ExitSystemCritical() END SendAndReceiveBuffer; PROCEDURE Send( sn: SendNode ) = (* REQUIRES inSystemCritical *) (* REQUIRES System.initialized AND sn.next = NIL *) BEGIN <* ASSERT System.Initialized() *> <* ASSERT sn.next = NIL *> IF sendInProgress THEN VAR p: SendNode := SendQueue; back: SendNode := NIL; BEGIN WHILE p # NIL DO back := p; p := p.next END; IF back = NIL THEN SendQueue := sn ELSE back.next := sn END END ELSE <* ASSERT SendQueue = NIL AND DecRefQueue = NIL AND currSendNode = NIL *> currSendNode := sn; InitiateSend( sn.dest, sn.msgFirst, sn.msgLast ) END END Send; PROCEDURE InitiateSend( destPid: INTEGER; msgFirst, msgLast: ADDRESS ) = (* REQUIRES inSystemCritical AND NOT sendInProgress *) (* ENSURES sendInProgress *) BEGIN <* ASSERT NOT sendInProgress *> System.InitiateSend( msgFirst, msgLast, Processor.PidToDxdy( destPid )); sendInProgress := TRUE END InitiateSend; PROCEDURE Initialize() = (* REQUIRES NOT System.initialized *) (* ENSURES System.initialized *) BEGIN System.Initialize( ADR( ReceiveBuffer ), ADR( ReceiveBuffer ) + ADRSIZE( ReceiveBuffer ) - ADRSIZE( ADDRESS )) END Initialize; BEGIN END IPC.