(* Copyright (C) 1992, Digital Equipment Corporation *) (* All rights reserved. *) (* See the file COPYRIGHT for a full description. *) (* Last modified on Wed Jul 1 21:19:07 1992 by rustan *) (* Rustan Leino *) UNSAFE MODULE RPC; IMPORT Word, IPC, RT0, RTException, RTExceptSafe, Thread, Processor, RTHeap; IMPORT Surrogate, System, RTMisc; TYPE CallDescrHeader = RECORD proc: ADDRESS; pt: ProcType END; CallDescrHeaderPtr = UNTRACED REF CallDescrHeader; ReturnDescr = RECORD serverThread: INTEGER; exception: RTExceptSafe.ExceptionName; word0: Word.T; word1: Word.T END; REVEAL ProcType = UNTRACED BRANDED REF RECORD returnType: ReturnValue; params: ParamsType END; TYPE ReturnValue = { None, Word, NetworkObject }; ParamsType = UNTRACED REF RECORD encodingSize: CARDINAL; formalsSize: CARDINAL; params: SET OF [ 0..200 ] (* NotIn = Value.Word; In = Value.NetworkObject *) END; (* ---------------------------- RPC Protocol ------------------------------ *) PROCEDURE Client( params: UNTRACED REF Word.T; proc: ADDRESS; pt: ProcType ): Word.T RAISES ANY = CONST HeaderSize = ADRSIZE( IPC.Header ); CallDescrHeaderSize = ADRSIZE( CallDescrHeader ); VAR buf: REF ARRAY OF Word.T; retBuf: ReturnDescr; retval: Word.T; destPid: INTEGER := LOOPHOLE( params, UNTRACED REF NETWORK )^.pid; class: ReturnValue; BEGIN (***** RPC step 0 *****) buf := NEW( REF ARRAY OF Word.T, HeaderSize + CallDescrHeaderSize + pt.params.encodingSize ); LOOPHOLE( ADR( buf[0] ), IPC.HeaderPtr )^ := IPC.Header{ type := IPC.MsgType.NewCall, src := IPC.GlobalID{ pid := Processor.ID(), id := LOOPHOLE( Thread.Self(), INTEGER ) }, param := CallDescrHeaderSize + pt.params.encodingSize }; LOOPHOLE( ADR( buf[ HeaderSize ] ), CallDescrHeaderPtr )^ := CallDescrHeader{ proc := proc, pt := pt }; Marshal( pt, params, buf, HeaderSize + CallDescrHeaderSize ); IPC.SendAndReceiveBuffer( destPid, ADR( buf[0] ), ADR( buf[LAST( buf^ )] ), ADR( retBuf ), ADR( retBuf ) + ADRSIZE( retBuf ) - ADRSIZE( ADDRESS )); (***** RPC step 2 *****) IF retBuf.exception = NIL THEN (* normal outcome *) class := pt.returnType ELSE (* exceptional outcome *) class := VAL( RTExceptSafe.ArgClass( retBuf.exception ), ReturnValue ) END; CASE class OF ReturnValue.None => (* skip *) | ReturnValue.Word => retval := retBuf.word0 | ReturnValue.NetworkObject => VAR net: NETWORK := UnmarshalOne( retBuf.word0, retBuf.word1 ); BEGIN retval := LOOPHOLE( net, INTEGER ); IF net # NIL AND net.pid # Processor.ID() THEN (* acknowledgement needed *) <* ASSERT retBuf.serverThread # 0 *> VAR header: IPC.Header; BEGIN header.type := IPC.MsgType.Reply; header.param := retBuf.serverThread; IPC.SendBuffer( destPid, ADR( header ), ADR( header ) + ADRSIZE( header ) - ADRSIZE( ADDRESS )) END END END END; IF retBuf.exception = NIL THEN (* normal outcome *) RETURN retval ELSE RTException.RaiseKRML( retBuf.exception, LOOPHOLE( retval, RTException.ExceptionArg )); <* ASSERT FALSE *> (* control should never reach this point *) END END Client; PROCEDURE Server( cl: NewClosure ): REFANY = VAR proc: ADDRESS; pt: ProcType; retMsg: RECORD header: IPC.Header; retBuf: ReturnDescr END; needAcknowledgement: BOOLEAN := FALSE; exc: RTExceptSafe.ExceptionName; retval: Word.T; BEGIN (***** RPC step 1 *****) WITH h = LOOPHOLE( ADR( cl.callDescr^[0] ), CallDescrHeaderPtr )^ DO proc := h.proc; pt := h.pt; END; Unmarshal( pt, cl.callDescr, 0, ADRSIZE( CallDescrHeader )); exc := Call( proc, pt.params.formalsSize, ADR( cl.callDescr[0] ), retval ); retMsg.retBuf.exception := exc; VAR class: ReturnValue; BEGIN IF exc = NIL THEN class := pt.returnType ELSE class := VAL( RTExceptSafe.ArgClass( exc ), ReturnValue ) END; CASE class OF ReturnValue.None => (* skip *) | ReturnValue.Word => retMsg.retBuf.word0 := retval | ReturnValue.NetworkObject => VAR gid := GetGlobalID( LOOPHOLE( retval, NETWORK )); BEGIN retMsg.retBuf.word0 := gid.pid; retMsg.retBuf.word1 := gid.id; needAcknowledgement := gid.id # 0 AND gid.pid # cl.client.pid END END END; retMsg.header.type := IPC.MsgType.Reply; retMsg.header.param := cl.client.id; IF NOT needAcknowledgement THEN IPC.SendBuffer( cl.client.pid, ADR( retMsg ), ADR( retMsg ) + ADRSIZE( retMsg ) - ADRSIZE( ADDRESS )) ELSE retMsg.retBuf.serverThread := LOOPHOLE( Thread.Self(), INTEGER ); IPC.SendAndReceiveBuffer( cl.client.pid, ADR( retMsg ), ADR( retMsg ) + ADRSIZE( retMsg ) - ADRSIZE( ADDRESS ), NIL, NIL ) (***** RPC step 2 *****) (* (that's it!) *) END; RETURN NIL END Server; (* ------------------------- Remote NEW Protocol -------------------------- *) PROCEDURE RemoteNew( destPid: INTEGER; def: RT0.TypeDefinition; bindings: ADDRESS ): ADDRESS = VAR pid := Processor.ID(); concrete: INTEGER; net: ADDRESS := NIL; sur: IPC.Surrogate; BEGIN IF destPid = pid THEN IF bindings # NIL THEN RETURN bindings END; RETURN RTHeap.Allocate( def ) END; IF destPid < Processor.Min() OR destPid > Processor.Max() THEN RTMisc.FatalError( RTMisc.Fault.IncorrectProcessor ) END; (***** Remote NEW step 0 *****) IF bindings = NIL THEN VAR h: IPC.Header; BEGIN h.type := IPC.MsgType.New; h.src := IPC.GlobalID{ pid := pid, id := LOOPHOLE( Thread.Self(), INTEGER ) }; h.param := LOOPHOLE( def, INTEGER ); IPC.SendAndReceiveBuffer( destPid, ADR( h ), ADR( h ) + ADRSIZE( h ) - ADRSIZE( ADDRESS ), ADR( concrete ), ADR( concrete )) END ELSE VAR a := LOOPHOLE( bindings - ADRSIZE( Word.T ), UNTRACED REF UNTRACED REF ARRAY OF Word.T )^; BEGIN WITH h = LOOPHOLE( ADR( a[0] ), IPC.HeaderPtr )^ DO h.type := IPC.MsgType.New; h.src := IPC.GlobalID{ pid := pid, id := LOOPHOLE( Thread.Self(), INTEGER ) }; h.param := LOOPHOLE( def, INTEGER ); IPC.SendAndReceiveBuffer( destPid, ADR( a[0] ), ADR( a[LAST( a^ )] ), ADR( concrete ), ADR( concrete )) END; DISPOSE( a ) END END; (***** Remote NEW step 2 *****) sur := NEW( IPC.Surrogate, gid := IPC.GlobalID{ pid := destPid, id := concrete } ); System.EnterSystemCritical(); net := RTHeap.AllocateSurrogate( def, sur ); Surrogate.Add( destPid, concrete, net ); System.ExitSystemCritical(); RETURN net END RemoteNew; (* ------------------ Marshalling and Unmarshalling ----------------------- *) PROCEDURE Marshal( pt: ProcType; params: UNTRACED REF Word.T; buf: REF ARRAY OF Word.T; i: CARDINAL ) = VAR gid: IPC.GlobalID; BEGIN WITH P = pt.params.params DO FOR param := 0 TO pt.params.formalsSize - 1 DO IF param IN P THEN (* Value.NetworkObject *) gid := GetGlobalID( LOOPHOLE( params^, NETWORK )); buf[ i ] := gid.pid; buf[ i+1 ] := gid.id; INC( i, 2 ); INC( params, ADRSIZE( params^ )) ELSE (* Value.Word *) buf[ i ] := params^; INC( i ); INC( params, ADRSIZE( params^ )) END END END; <* ASSERT NUMBER( buf^ ) = i *> END Marshal; PROCEDURE Unmarshal( pt: ProcType; buf: REF ARRAY OF Word.T; iWrite: CARDINAL; iRead: CARDINAL ) = BEGIN WITH P = pt.params.params DO FOR param := 0 TO pt.params.formalsSize - 1 DO IF param IN P THEN (* Value.NetworkObject *) buf[ iWrite ] := LOOPHOLE( UnmarshalOne( buf[iRead], buf[iRead+1] ), Word.T ); INC( iWrite ); INC( iRead, 2 ) ELSE (* Value.Word *) IF iWrite # iRead THEN buf[ iWrite ] := buf[ iRead ] END; INC( iWrite ); INC( iRead ) END END END END Unmarshal; PROCEDURE UnmarshalOne( pid, id: INTEGER ): NETWORK = VAR sur: IPC.Surrogate := NIL; h: IPC.Header; def: RT0.TypeDefinition := NIL; BEGIN IF id = 0 THEN <* ASSERT pid = 0 *> RETURN NIL END; IF pid = Processor.ID() THEN RETURN LOOPHOLE( id, NETWORK ) END; VAR found: BOOLEAN; BEGIN System.EnterSystemCritical(); found := Surrogate.Find( pid, id, sur ); System.ExitSystemCritical(); IF found THEN RETURN LOOPHOLE( sur, NETWORK ) END END; (* increment the reference count of the concrete object and get back the a pointer to the typecell of the object's type *) h.type := IPC.MsgType.IncRef; h.src := IPC.GlobalID{ pid := Processor.ID(), id := LOOPHOLE( Thread.Self(), INTEGER ) }; h.param := id; IPC.SendAndReceiveBuffer( pid, ADR( h ), ADR( h ) + ADRSIZE( h ) - ADRSIZE( ADDRESS ), ADR( def ), ADR( def )); VAR s := NEW( IPC.Surrogate, gid := IPC.GlobalID{ pid := pid, id := id } ); found: BOOLEAN; BEGIN System.EnterSystemCritical(); found := Surrogate.Find( pid, id, sur ); IF NOT found THEN sur := RTHeap.AllocateSurrogate( def, s ); Surrogate.Add( pid, id, sur ); END; System.ExitSystemCritical(); IF found THEN DISPOSE( s ) ELSE RETURN LOOPHOLE( sur, NETWORK ) END END; (* Some other thread entered this surrogate into the table between the two LOCK calls above. Thus, decrement the reference count for this object again. Then, return 'sur', which is the local surrogate. *) h.type := IPC.MsgType.DecRef; h.param := id; IPC.SendBuffer( pid, ADR( h ), ADR( h ) + ADRSIZE( h ) - ADRSIZE( ADDRESS )); RETURN LOOPHOLE( sur, NETWORK ) END UnmarshalOne; PROCEDURE GetGlobalID( net: NETWORK ): IPC.GlobalID = VAR pid := Processor.ID(); BEGIN IF net = NIL THEN RETURN IPC.GlobalID{ pid := 0, id := 0 } ELSIF net.pid = pid THEN RETURN IPC.GlobalID{ pid := pid, id := LOOPHOLE( net, INTEGER ) } ELSE RETURN LOOPHOLE( net, IPC.Surrogate ).gid END END GetGlobalID; BEGIN END RPC.