diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnector.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnector.cs new file mode 100644 index 0000000000..62bdcf6efc --- /dev/null +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnector.cs @@ -0,0 +1,530 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using OpenMetaverse; +using OpenMetaverse.Packets; +using OpenMetaverse.StructuredData; +using OpenSim.Framework; +using OpenSim.Services.Interfaces; +using OpenSim.Framework.Client; +using OpenSim.Region.Physics.Manager; +using OpenSim.Region.Framework.Scenes; +using OpenSim.Region.Framework.Scenes.Serialization; +using OpenSim.Region.Framework.Interfaces; +using OpenSim.Region.Framework.Scenes.Types; +using log4net; + +using Nini.Config; + +namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule +{ + //The data structure that maintains the list of quarks a script engine subscribes to. + //It might be better to organize the quarks in a k-d tree structure, for easier + //partitioning of the quarks based on spatial information. + //But for now, we just assume the quarks each script engine operates on form a rectangle shape. + //So we just use xmin,ymin and xmax,ymax to identify the rectange; and use a List structure to + //store the quarks. + //Quark size is defined in QuarkInfo.SizeX and QuarkInfo.SizeY. + + // The RegionSyncPhysEngine has a receive thread to process messages from the RegionSyncServer. + // It is the client side of the synchronization channel, and send to and receive updates from the + // Auth. Scene. The server side thread handling the sync channel is implemented in RegionSyncScriptAPI.cs. + // + // The current implementation is very similar to RegionSyncClient. + // TODO: eventually the same RegionSyncSceneAPI should handle all traffic from different actors, e.g. + // using a pub/sub model. + public class PhysEngineToSceneConnector + { + #region PhysEngineToSceneConnector members + + // Set the addr and port of RegionSyncServer + private IPAddress m_addr; + private string m_addrString; + private Int32 m_port; + + // A reference to the local scene + private Scene m_validLocalScene; + + // The avatars added to this client manager for clients on other client managers + object m_syncRoot = new object(); + + // The logfile + private ILog m_log; + + private string LogHeader = "[PHYSICS ENGINE TO SCENE CONNECTOR]"; + + // The listener and the thread which listens for connections from client managers + private Thread m_rcvLoop; + + // The client connection to the RegionSyncServer + private TcpClient m_client = new TcpClient(); + + + //KittyL: Comment out m_statsTimer for now, will figure out whether we need it for PhysEngine later + //private System.Timers.Timer m_statsTimer = new System.Timers.Timer(30000); + + // The queue of incoming messages which need handling + //private Queue m_inQ = new Queue(); + + //KittyL: added to identify different actors + private ActorType m_actorType = ActorType.PhysicsEngine; + + private bool m_debugWithViewer = false; + private long m_messagesSent = 0; + private long m_messagesReceived = 0; + + private QuarkSubsriptionInfo m_subscribedQuarks; + + + private IConfig m_sysConfig; + + //members for load balancing purpose + //private TcpClient m_loadMigrationSouceEnd = null; + private LoadMigrationEndPoint m_loadMigrationSouceEnd = null; + private Thread m_loadMigrationSrcRcvLoop; + private LoadMigrationListener m_loadMigrationListener = null; + + //List of queued messages, when the space that the updated object is located is being migrated + private List m_updateMsgQueue = new List(); + + #endregion + + + // Constructor + public PhysEngineToSceneConnector(Scene validLocalScene, string addr, int port, bool debugWithViewer, + IConfig sysConfig) + { + m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + m_validLocalScene = validLocalScene; + m_addr = IPAddress.Parse(addr); + m_addrString = addr; + m_port = port; + m_debugWithViewer = debugWithViewer; + //m_statsTimer.Elapsed += new System.Timers.ElapsedEventHandler(StatsTimerElapsed); + m_sysConfig = sysConfig; + + SceneToPhysEngineSyncServer.logEnabled = m_sysConfig.GetBoolean("LogEnabled", false); + SceneToPhysEngineSyncServer.logDir = m_sysConfig.GetString("LogDir", "."); + + //assume we are connecting to the whole scene as one big quark + m_subscribedQuarks = new QuarkSubsriptionInfo(0, 0, (int)Constants.RegionSize, (int)Constants.RegionSize); + } + + private List GetQuarkStringList() + { + List quarkList = new List(); + foreach (QuarkInfo quark in m_subscribedQuarks.QuarkList) + { + quarkList.Add(quark.QuarkStringRepresentation); + } + return quarkList; + } + + // Start the RegionSyncPhysEngine client thread + public bool Start() + { + if (EstablishConnection()) + { + StartStateSync(); + return true; + } + else + { + return false; + } + } + + private bool EstablishConnection() + { + if (m_client.Connected) + { + m_log.Warn(LogHeader + ": already connected"); + return false; + } + + try + { + m_client.Connect(m_addr, m_port); + } + catch (Exception e) + { + m_log.WarnFormat("{0} [Start] Could not connect to SceneToPhysEngineSyncServer at {1}:{2}", LogHeader, m_addr, m_port); + m_log.Warn(e.Message); + return false; + } + + m_log.WarnFormat("{0} Connected to SceneToPhysEngineSyncServer at {1}:{2}", LogHeader, m_addr, m_port); + + m_rcvLoop = new Thread(new ThreadStart(ReceiveLoop)); + m_rcvLoop.Name = "PhysEngineToSceneConnector ReceiveLoop"; + m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_rcvLoop.Name); + m_rcvLoop.Start(); + return true; + } + + private void StartStateSync() + { + RegionSyncMessage msg = new RegionSyncMessage(RegionSyncMessage.MsgType.ActorStatus, Convert.ToString((int)ActorStatus.Sync)); + Send(msg); + // SendQuarkSubscription(); + Thread.Sleep(100); + DoInitialSync(); + } + + + private void SendQuarkSubscription() + { + List quarkStringList = GetQuarkStringList(); + string quarkString = RegionSyncUtil.QuarkStringListToString(quarkStringList); + + m_log.Debug(LogHeader + ": subscribe to quarks: " + quarkString); + //Send(quarkString); + RegionSyncMessage msg = new RegionSyncMessage(RegionSyncMessage.MsgType.QuarkSubscription, quarkString); + Send(msg); + } + + public void SetQuarkSubscription(QuarkSubsriptionInfo quarks) + { + m_subscribedQuarks = quarks; + } + + public void RegisterIdle() + { + EstablishConnection(); + RegionSyncMessage msg = new RegionSyncMessage(RegionSyncMessage.MsgType.ActorStatus, Convert.ToString((int)ActorStatus.Idle)); + Send(msg); + } + + private void DoInitialSync() + { + // m_validLocalScene.DeleteAllSceneObjects(); + //m_log.Debug(LogHeader + ": send actor type " + m_actorType); + //Send(new RegionSyncMessage(RegionSyncMessage.MsgType.ActorType, Convert.ToString((int)m_actorType))); + //KittyL??? Do we need to send in RegionName? + + //Send(new RegionSyncMessage(RegionSyncMessage.MsgType.RegionName, m_scene.RegionInfo.RegionName)); + //m_log.WarnFormat("Sending region name: \"{0}\"", m_scene.RegionInfo.RegionName); + + // Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetTerrain)); + // Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetObjects)); + + // Register for events which will be forwarded to authoritative scene + // m_scene.EventManager.OnNewClient += EventManager_OnNewClient; + //m_scene.EventManager.OnClientClosed += new EventManager.ClientClosed(RemoveLocalClient); + } + + // Disconnect from the RegionSyncServer and close client thread + public void Stop() + { + Send(new RegionSyncMessage(RegionSyncMessage.MsgType.ActorStop, "stop")); + // The remote scene will remove the SceneToPhysEngineConnector when we disconnect + m_rcvLoop.Abort(); + ShutdownClient(); + + //stop the migration connections + //ShutdownClient(m_loadMigrationSouceEnd); + if (m_loadMigrationListener != null) + m_loadMigrationListener.Shutdown(); + } + + public void ReportStatus() + { + m_log.WarnFormat("{0} Synchronized to RegionSyncServer at {1}:{2}", LogHeader, m_addr, m_port); + m_log.WarnFormat("{0} Received={1}, Sent={2}", LogHeader, m_messagesReceived, m_messagesSent); + lock (m_syncRoot) + { + //TODO: should be reporting about the information of the objects/scripts + } + } + + private void ShutdownClient() + { + m_log.WarnFormat("{0} Disconnected from RegionSyncServer. Shutting down.", LogHeader); + + //TODO: remove the objects and scripts + //lock (m_syncRoot) + //{ + + //} + + if (m_client != null) + { + // Close the connection + m_client.Client.Close(); + m_client.Close(); + } + SceneToPhysEngineSyncServer.PhysLogMessageClose(); + } + + // Listen for messages from a RegionSyncServer + // *** This is the main thread loop for each connected client + private void ReceiveLoop() + { + m_log.WarnFormat("{0} Thread running: {1}", LogHeader, m_rcvLoop.Name); + while (true && m_client.Connected) + { + RegionSyncMessage msg; + // Try to get the message from the network stream + try + { + msg = new RegionSyncMessage(m_client.GetStream()); + //m_log.WarnFormat("{0} Received: {1}", LogHeader, msg.ToString()); + } + // If there is a problem reading from the client, shut 'er down. + catch + { + ShutdownClient(); + return; + } + // Try handling the message + try + { + //lock (m_syncRoot) -- KittyL: why do we need to lock here? We could lock inside HandleMessage if necessary, and lock on different objects for better performance + m_messagesReceived++; + HandleMessage(msg); + } + catch (Exception e) + { + m_log.WarnFormat("{0} Encountered an exception: {1} (MSGTYPE = {2})", LogHeader, e.Message, msg.ToString()); + } + } + } + + #region SEND + //DSG-TODO: for Scene based DSG, Send() also needs to figure out which Scene to send to, e.g. needs a switching function based on object position + + // Send a message to a single connected RegionSyncServer + private void Send(string msg) + { + byte[] bmsg = System.Text.Encoding.ASCII.GetBytes(msg + System.Environment.NewLine); + Send(bmsg); + } + + private void Send(RegionSyncMessage msg) + { + Send(msg.ToBytes()); + //m_log.WarnFormat("{0} Sent {1}", LogHeader, msg.ToString()); + } + + private void Send(byte[] data) + { + if (m_client.Connected) + { + try + { + m_client.GetStream().Write(data, 0, data.Length); + m_messagesSent++; + } + // If there is a problem reading from the client, shut 'er down. + // *** Still need to alert the module that it's no longer connected! + catch + { + ShutdownClient(); + } + } + } + #endregion SEND + + //KittyL: Has to define SendCoarseLocations() here, since it's defined in IRegionSyncClientModule. + // But should not do much as being PhysEngine, not ClientManager + public void SendCoarseLocations() + { + } + + // Handle an incoming message + // Dan-TODO: This should not be synchronous with the receive! + // Instead, handle messages from an incoming Queue so server doesn't block sending + // + // KittyL: This is the function that PhysEngine and ClientManager have the most different implementations + private void HandleMessage(RegionSyncMessage msg) + { + //TO FINISH: + + SceneToPhysEngineSyncServer.PhysLogMessage(false, msg); + switch (msg.Type) + { + case RegionSyncMessage.MsgType.RegionName: + { + return; + } + case RegionSyncMessage.MsgType.PhysUpdateAttributes: + { + HandlePhysUpdateAttributes(msg); + return; + } + default: + { + RegionSyncMessage.HandleError(LogHeader, msg, String.Format("{0} Unsupported message type: {1}", LogHeader, ((int)msg.Type).ToString())); + return; + } + } + + } + + /// + /// The physics engine has some updates to the attributes. Unpack the parameters, find the + /// correct PhysicsActor and plug in the new values; + /// + /// + private void HandlePhysUpdateAttributes(RegionSyncMessage msg) + { + // TODO: + OSDMap data = RegionSyncUtil.DeserializeMessage(msg, LogHeader); + try + { + UUID uuid = data["uuid"].AsUUID(); + string actorID = data["actorID"].AsString(); + // m_log.DebugFormat("{0}: HandlPhysUpdateAttributes for {1}", LogHeader, uuid); + PhysicsActor pa = FindPhysicsActor(uuid); + if (pa != null) + { + // pa.Size = data["size"].AsVector3(); + pa.Position = data["position"].AsVector3(); + pa.Force = data["force"].AsVector3(); + pa.Velocity = data["velocity"].AsVector3(); + pa.RotationalVelocity = data["rotationalVelocity"].AsVector3(); + pa.Acceleration = data["acceleration"].AsVector3(); + pa.Torque = data["torque"].AsVector3(); + pa.Orientation = data["orientation"].AsQuaternion(); + pa.IsPhysical = data["isPhysical"].AsBoolean(); // receive?? + pa.Flying = data["flying"].AsBoolean(); // receive?? + pa.Kinematic = data["kinematic"].AsBoolean(); // receive?? + pa.Buoyancy = (float)(data["buoyancy"].AsReal()); + SceneObjectPart sop = m_validLocalScene.GetSceneObjectPart(uuid); + if (sop != null) + { + pa.Shape = sop.Shape; + } + pa.ChangingActorID = actorID; + m_validLocalScene.PhysicsScene.AddPhysicsActorTaint(pa); + } + else + { + m_log.WarnFormat("{0}: attribute update for unknown uuid {1}", LogHeader, uuid); + return; + } + } + catch (Exception e) + { + m_log.WarnFormat("{0}: EXCEPTION processing UpdateAttributes: {1}", LogHeader, e); + return; + } + return; + } + + // Find the physics actor whether it is an object or a scene presence + private PhysicsActor FindPhysicsActor(UUID uuid) + { + SceneObjectPart sop = m_validLocalScene.GetSceneObjectPart(uuid); + if (sop != null) + { + return sop.PhysActor; + } + ScenePresence sp = m_validLocalScene.GetScenePresence(uuid); + if (sp != null) + { + return sp.PhysicsActor; + } + return null; + } + + public void SendPhysUpdateAttributes(PhysicsActor pa) + { + // m_log.DebugFormat("{0}: SendPhysUpdateAttributes for {1}", LogHeader, pa.UUID); + OSDMap data = new OSDMap(17); + data["time"] = OSD.FromString(DateTime.Now.ToString("yyyyMMddHHmmssfff")); + data["localID"] = OSD.FromUInteger(pa.LocalID); + data["uuid"] = OSD.FromUUID(pa.UUID); + data["actorID"] = OSD.FromString(RegionSyncServerModule.ActorID); + data["size"] = OSD.FromVector3(pa.Size); + data["position"] = OSD.FromVector3(pa.Position); + data["force"] = OSD.FromVector3(pa.Force); + data["velocity"] = OSD.FromVector3(pa.Velocity); + data["rotationalVelocity"] = OSD.FromVector3(pa.RotationalVelocity); + data["acceleration"] = OSD.FromVector3(pa.Acceleration); + data["torque"] = OSD.FromVector3(pa.Torque); + data["orientation"] = OSD.FromQuaternion(pa.Orientation); + data["isPhysical"] = OSD.FromBoolean(pa.IsPhysical); + data["flying"] = OSD.FromBoolean(pa.Flying); + data["buoyancy"] = OSD.FromReal(pa.Buoyancy); + data["isColliding"] = OSD.FromBoolean(pa.IsColliding); + data["isCollidingGround"] = OSD.FromBoolean(pa.CollidingGround); + + RegionSyncMessage rsm = new RegionSyncMessage(RegionSyncMessage.MsgType.PhysUpdateAttributes, + OSDParser.SerializeJsonString(data)); + Send(rsm); + return; + } + + #region Utility functions + + private OSDMap GetOSDMap(string strdata) + { + OSDMap args = null; + OSD buffer = OSDParser.DeserializeJson(strdata); + if (buffer.Type == OSDType.Map) + { + args = (OSDMap)buffer; + return args; + } + return null; + + } + + HashSet exceptions = new HashSet(); + private OSDMap DeserializeMessage(RegionSyncMessage msg) + { + OSDMap data = null; + try + { + data = OSDParser.DeserializeJson(Encoding.ASCII.GetString(msg.Data, 0, msg.Length)) as OSDMap; + } + catch (Exception e) + { + lock (exceptions) + // If this is a new message, then print the underlying data that caused it + if (!exceptions.Contains(e.Message)) + m_log.Error(LogHeader + " " + Encoding.ASCII.GetString(msg.Data, 0, msg.Length)); + data = null; + } + return data; + } + + + + public string GetServerAddressAndPort() + { + return m_addr.ToString() + ":" + m_port.ToString(); + } + + #endregion Utility functions + + #region Handlers for Scene events + + private void HandleAddOrUpdateObjectInLocalScene(RegionSyncMessage msg) + { + // TODO: modify for physics + OSDMap data = DeserializeMessage(msg); + /* + if (data["locX"] == null || data["locY"] == null || data["sogXml"] == null) + { + m_log.Warn(LogHeader + ": parameters missing in NewObject/UpdatedObject message, need to have locX, locY, sogXml"); + return; + } + * */ + uint locX = data["locX"].AsUInteger(); + uint locY = data["locY"].AsUInteger(); + string sogxml = data["sogXml"].AsString(); + SceneObjectGroup sog = SceneObjectSerializer.FromXml2Format(sogxml); + + } + + #endregion Handlers for events/updates from Scene + + } +} diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnectorModule.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnectorModule.cs new file mode 100644 index 0000000000..0e7d94276c --- /dev/null +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/PhysEngineToSceneConnectorModule.cs @@ -0,0 +1,472 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSim Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Nini.Config; +using OpenMetaverse; +using OpenSim.Framework; +using OpenSim.Framework.Client; +using OpenSim.Region.CoreModules.Framework.InterfaceCommander; +using OpenSim.Region.Framework.Interfaces; +using OpenSim.Region.Framework.Scenes; +using OpenSim.Region.Physics.Manager; +using OpenSim.Services.Interfaces; +using log4net; +using System.Net; +using System.Net.Sockets; + +namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule +{ + //The connector that connects the local Scene (cache) and remote authoratative Scene + public class PhysEngineToSceneConnectorModule : IRegionModule, IPhysEngineToSceneConnectorModule, ICommandableModule + { + #region PhysEngineToSceneConnectorModule members and functions + + private static int m_activeActors = 0; + private bool m_active = false; + private string m_serveraddr; + private int m_serverport; + private Scene m_scene; + private static List m_allScenes = new List(); + private ILog m_log; + private Object m_client_lock = new Object(); + //private PhysEngineToSceneConnector m_scriptEngineToSceneConnector = null; + private IConfig m_syncConfig = null; + public IConfig SyncConfig { get { return m_syncConfig; } } + private bool m_debugWithViewer = false; + public bool DebugWithViewer { get { return m_debugWithViewer; } } + private string m_regionSyncMode = ""; + + //Variables relavant for multi-scene subscription. + private Dictionary m_PEToSceneConnectors = new Dictionary(); //connector for each auth. scene + private string LogHeader = "[PhysEngineToSceneConnectorModule]"; + private PhysEngineToSceneConnector m_idlePEToSceneConnector = null; + private PhysEngineToSceneConnector m_physEngineToSceneConnector = null; + + //quark information + //private int QuarkInfo.SizeX; + //private int QuarkInfo.SizeY; + //private string m_quarkListString; + private string m_subscriptionSpaceString; + + #endregion PhysEngineToSceneConnectorModule members and functions + + #region IRegionModule Members + + public void Initialise(Scene scene, IConfigSource config) + { + m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + m_active = false; //set to false unless this is the valid local scene + + //Read in configuration + IConfig syncConfig = config.Configs["RegionSyncModule"]; + if (syncConfig != null + && syncConfig.GetBoolean("Enabled", false) + // && syncConfig.GetString("Mode", "").ToLower() == "client" + && syncConfig.GetBoolean("PhysEngineClient", false) + ) + { + //scene.RegionSyncEnabled = true; + } + else + { + //scene.RegionSyncEnabled = false; + m_log.Warn(LogHeader + ": Not in physics engine client mode. Shutting down."); + return; + } + + m_active = true; + m_activeActors++; + + m_log.Debug(LogHeader + " Init PEToSceneConnectorModule, for local scene " + scene.RegionInfo.RegionName); + + m_scene = scene; + m_scene.RegisterModuleInterface(this); + m_syncConfig = syncConfig; + m_debugWithViewer = syncConfig.GetBoolean("PhysEngineDebugWithViewer", false); + + //read in the quark size information + //QuarkInfo.SizeX = syncConfig.GetInt("QuarkSizeX", (int)Constants.RegionSize); + //QuarkInfo.SizeY = syncConfig.GetInt("QuarkSizeY", (int)Constants.RegionSize); + QuarkInfo.SizeX = syncConfig.GetInt("QuarkSizeX", (int)Constants.RegionSize); + QuarkInfo.SizeY = syncConfig.GetInt("QuarkSizeY", (int)Constants.RegionSize); + + //m_quarkListString = syncConfig.GetString("InitQuarkSet", ""); //if not specified, dost not subscribe to any quark + //if (m_quarkListString.Equals("all")) + //{ + // m_quarkListString = RegionSyncUtil.QuarkStringListToString(RegionSyncUtil.GetAllQuarkStringInScene(QuarkInfo.SizeX, QuarkInfo.SizeY)); + //} + m_subscriptionSpaceString = syncConfig.GetString("InitSubscriptionSpace", "0_0,256_256"); + + + // Setup the command line interface + m_scene.EventManager.OnPluginConsole += EventManager_OnPluginConsole; + InstallInterfaces(); + + m_log.Warn(LogHeader + " Initialised"); + + // collect all the scenes for later routing + if (!m_allScenes.Contains(scene)) + { + m_allScenes.Add(scene); + } + } + + public void PostInitialise() + { + if (!m_active) + return; + + Start(); // fake a 'phys start' to get things going + + //m_log.Warn(LogHeader + " Post-Initialised"); + } + + public void Close() + { + if (m_active) + { + } + m_scene = null; + m_active = false; + m_activeActors--; + } + + public string Name + { + get { return "RegionSyncPhysEngineModule"; } + } + + public bool IsSharedModule + { + get { return false; } + } + #endregion + + #region ICommandableModule Members + private readonly Commander m_commander = new Commander("phys"); + public ICommander CommandInterface + { + get { return m_commander; } + } + #endregion + + #region IPhysEngineToSceneConnectorModule members + + + public bool Active + { + get { return m_active; } + } + + public bool Synced + { + get + { + lock(m_client_lock) + { + return (m_PEToSceneConnectors.Count > 0); + } + } + } + + bool IPhysEngineToSceneConnectorModule.IsPhysEngineActor() + { + return PhysEngineToSceneConnectorModule.IsPhysEngineActorS; + } + bool IPhysEngineToSceneConnectorModule.IsPhysEngineScene() + { + return PhysEngineToSceneConnectorModule.IsPhysEngineSceneS; + } + bool IPhysEngineToSceneConnectorModule.IsActivePhysEngineScene() + { + return PhysEngineToSceneConnectorModule.IsActivePhysEngineSceneS; + } + + public static bool IsPhysEngineSceneS + { + get { return SceneToPhysEngineSyncServer.IsPhysEngineScene2S(); } + } + public static bool IsActivePhysEngineSceneS + { + get { return SceneToPhysEngineSyncServer.IsActivePhysEngineScene2S(); } + } + public static bool IsPhysEngineActorS + { + get { return (m_activeActors != 0); } + } + + /// + /// The scene is unknown by ODE so we have to look through the scenes to + /// find the one with this PhysicsActor so we can send the update. + /// + /// + public static void RouteUpdate(PhysicsActor pa) + { + SceneObjectPart sop; + ScenePresence sp; + Scene s = null; + foreach (Scene ss in m_allScenes) + { + try + { + sop = ss.GetSceneObjectPart(pa.UUID); + } + catch + { + sop = null; + } + if (sop != null) + { + s = ss; + break; + } + try + { + sp = ss.GetScenePresence(pa.UUID); + } + catch + { + sp = null; + } + if (sp != null) + { + s = ss; + break; + } + } + if (s != null) + { + if (s.PhysEngineToSceneConnectorModule != null) + { + s.PhysEngineToSceneConnectorModule.SendUpdate(pa); + } + else + { + Console.WriteLine("RouteUpdate: PhysEngineToSceneConnectorModule is null"); + } + } + else + { + Console.WriteLine("RouteUpdate: no SOP found for {0}", pa.UUID); + } + return; + } + + #endregion + + + #region Event Handlers + #endregion + + private void DebugSceneStats() + { + return; + /* + List avatars = m_scene.GetAvatars(); + List entities = m_scene.GetEntities(); + m_log.WarnFormat("{0} There are {1} avatars and {2} entities in the scene", LogHeader, avatars.Count, entities.Count); + */ + } + + public void SendUpdate(PhysicsActor pa) + { + if (this.m_physEngineToSceneConnector != null) + { + this.m_physEngineToSceneConnector.SendPhysUpdateAttributes(pa); + } + } + + #region Console Command Interface + //IMPORTANT: these functions should only be actived for the PhysEngineToSceneConnectorModule that is associated with the valid local scene + + private void InstallInterfaces() + { + Command cmdSyncStart = new Command("start", CommandIntentions.COMMAND_HAZARDOUS, SyncStart, "Begins synchronization with RegionSyncServer."); + //cmdSyncStart.AddArgument("server_port", "The port of the server to synchronize with", "Integer"); + + Command cmdSyncStop = new Command("stop", CommandIntentions.COMMAND_HAZARDOUS, SyncStop, "Stops synchronization with RegionSyncServer."); + //cmdSyncStop.AddArgument("server_address", "The IP address of the server to synchronize with", "String"); + //cmdSyncStop.AddArgument("server_port", "The port of the server to synchronize with", "Integer"); + + Command cmdSyncStatus = new Command("status", CommandIntentions.COMMAND_HAZARDOUS, SyncStatus, "Displays synchronization status."); + + //The following two commands are more for easier debugging purpose + Command cmdSyncSetQuarks = new Command("quarkSpace", CommandIntentions.COMMAND_HAZARDOUS, SetQuarkList, "Set the set of quarks to subscribe to. For debugging purpose. Should be issued before \"sync start\""); + cmdSyncSetQuarks.AddArgument("quarkSpace", "The (rectangle) space of quarks to subscribe, represented by x0_y0,x1_y1, the left-bottom and top-right corners of the rectangel space", "String"); + + Command cmdSyncSetQuarkSize = new Command("quarksize", CommandIntentions.COMMAND_HAZARDOUS, SetQuarkSize, "Set the size of each quark. For debugging purpose. Should be issued before \"sync quarks\""); + cmdSyncSetQuarkSize.AddArgument("quarksizeX", "The size on x axis of each quark", "Integer"); + cmdSyncSetQuarkSize.AddArgument("quarksizeY", "The size on y axis of each quark", "Integer"); + + m_commander.RegisterCommand("start", cmdSyncStart); + m_commander.RegisterCommand("stop", cmdSyncStop); + m_commander.RegisterCommand("status", cmdSyncStatus); + m_commander.RegisterCommand("quarkSpace", cmdSyncSetQuarks); + + lock (m_scene) + { + // Add this to our scene so scripts can call these functions + m_scene.RegisterModuleCommander(m_commander); + } + } + + + /// + /// Processes commandline input. Do not call directly. + /// + /// Commandline arguments + private void EventManager_OnPluginConsole(string[] args) + { + if (args[0] == "phys") + { + if (args.Length == 1) + { + m_commander.ProcessConsoleCommand("help", new string[0]); + return; + } + + string[] tmpArgs = new string[args.Length - 2]; + int i; + for (i = 2; i < args.Length; i++) + tmpArgs[i - 2] = args[i]; + + m_commander.ProcessConsoleCommand(args[1], tmpArgs); + } + } + + private void SyncStart(Object[] args) + { + Start(); + } + + private void Start() + { + m_serveraddr = m_scene.RegionInfo.PhysicsSyncServerAddress; + m_serverport = m_scene.RegionInfo.PhysicsSyncServerPort; + + lock (m_client_lock) + { + //m_log.Warn(LogHeader + " Starting synchronization"); + m_log.Warn(LogHeader + ": Starting RegionSyncPhysEngine"); + + //Only one remote scene to connect to. Subscribe to whatever specified in the config file. + //List quarkStringList = RegionSyncUtil.QuarkStringToStringList(m_quarkListString); + //InitPhysEngineToSceneConnector(quarkStringList); + InitPhysEngineToSceneConnector(m_subscriptionSpaceString); + } + } + + private void SetQuarkList(Object[] args) + { + m_subscriptionSpaceString = (string)args[0]; + + InitPhysEngineToSceneConnector(m_subscriptionSpaceString); + } + + private void SetQuarkSize(Object[] args) + { + QuarkInfo.SizeX = (int)args[0]; + QuarkInfo.SizeY = (int)args[1]; + + } + + private void InitPhysEngineToSceneConnector(string space) + { + + m_physEngineToSceneConnector = new PhysEngineToSceneConnector(m_scene, + m_serveraddr, m_serverport, m_debugWithViewer, /* space,*/ m_syncConfig); + if (m_physEngineToSceneConnector.Start()) + { + m_PEToSceneConnectors.Add(m_scene.RegionInfo.RegionName, m_physEngineToSceneConnector); + } + } + + private void SyncStop(Object[] args) + { + lock (m_client_lock) + { + //if (m_scriptEngineToSceneConnector == null) + if(m_PEToSceneConnectors.Count==0 && m_idlePEToSceneConnector==null) + { + m_log.Warn(LogHeader + " Already stopped"); + return; + } + + if (m_PEToSceneConnectors.Count > 0) + { + foreach (KeyValuePair valPair in m_PEToSceneConnectors) + { + PhysEngineToSceneConnector connector = valPair.Value; + if (connector == null) + { + continue; + } + connector.Stop(); + } + m_PEToSceneConnectors.Clear(); + } + else if (m_idlePEToSceneConnector != null) + { + m_idlePEToSceneConnector.Stop(); + m_idlePEToSceneConnector = null; + } + + //m_scriptEngineToSceneConnector.Stop(); + //m_scriptEngineToSceneConnector = null; + m_log.Warn(LogHeader+": Stopping synchronization"); + } + + //save script state and stop script instances + // TODO: Load balancing. next line commented out to compile + // m_scene.EventManager.TriggerPhysEngineSyncStop(); + //remove all objects + m_scene.DeleteAllSceneObjects(); + + } + + private void SyncStatus(Object[] args) + { + lock (m_client_lock) + { + if (m_PEToSceneConnectors.Count == 0) + { + m_log.Warn(LogHeader + " Not currently synchronized"); + return; + } + foreach (KeyValuePair pair in m_PEToSceneConnectors) + { + PhysEngineToSceneConnector sceneConnector = pair.Value; + sceneConnector.ReportStatus(); + } + } + } + #endregion + } +} diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineConnector.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineConnector.cs new file mode 100644 index 0000000000..ea3601d98d --- /dev/null +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineConnector.cs @@ -0,0 +1,559 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Collections.Generic; +using System.Threading; +using OpenMetaverse; +using OpenMetaverse.Packets; +using OpenMetaverse.StructuredData; +using OpenSim.Framework; +using OpenSim.Region.Framework.Scenes; +using OpenSim.Region.Framework.Scenes.Serialization; +using OpenSim.Region.Framework.Interfaces; +using OpenSim.Region.Physics.Manager; +using log4net; + +namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule +{ + + + + //KittyL: NOTE -- We need to define an interface for all actors to connect into the Scene, + // e.g. IActorConnector, that runs on the Scene side, processes messages from actors, + // and apply Scene/Object operations. + + // The SceneToPhysEngineConnector acts as a thread on the RegionSyncServer to handle incoming + // messages from PhysEngineToSceneConnectors that run on Physics Engines. It connects the + // authoratative Scene with remote script engines. + public class SceneToPhysEngineConnector + { + #region SceneToPhysEngineConnector members + + object stats = new object(); + private DateTime lastStatTime; + private long msgsIn; + private long msgsOut; + private long bytesIn; + private long bytesOut; + private long pollBlocks; + private int lastTotalCount; + private int lastLocalCount; + private int lastRemoteCount; + + private int msgCount = 0; + + // The TcpClient this view uses to communicate with its RegionSyncClient + private TcpClient m_tcpclient; + // Set the addr and port for TcpListener + private IPAddress m_addr; + private Int32 m_port; + private int m_connection_number; + private Scene m_scene; + + object m_syncRoot = new object(); + Dictionary m_syncedAvatars = new Dictionary(); + + // A queue for incoming and outgoing traffic + private OpenMetaverse.BlockingQueue inbox = new OpenMetaverse.BlockingQueue(); + private OpenMetaverse.BlockingQueue outbox = new OpenMetaverse.BlockingQueue(); + + private ILog m_log; + + private Thread m_receive_loop; + private string m_regionName; + + private SceneToPhysEngineSyncServer m_syncServer = null; + + // A string of the format [REGION SYNC SCRIPT API (regionname)] for use in log headers + private string LogHeader + { + get + { + if (m_regionName == null) + return String.Format("[SceneToPhysEngineConnector #{0}]", m_connection_number); + return String.Format("[SceneToPhysEngineConnector #{0} ({1:10})]", m_connection_number, m_regionName); + } + } + + // A string of the format "RegionSyncClientView #X" for use in describing the object itself + public string Description + { + get + { + if (m_regionName == null) + return String.Format("RegionSyncPhysAPI #{0}", m_connection_number); + return String.Format("RegionSyncPhysAPI #{0} ({1:10})", m_connection_number, m_regionName); + } + } + + public int ConnectionNum + { + get { return m_connection_number; } + } + + public string GetStats() + { + string ret; + //lock (m_syncRoot) + // syncedAvCount = m_syncedAvatars.Count; + lock (stats) + { + double secondsSinceLastStats = DateTime.Now.Subtract(lastStatTime).TotalSeconds; + lastStatTime = DateTime.Now; + + // ret = String.Format("[{0,4}/{1,4}], [{2,4}/{3,4}], [{4,4}/{5,4}], [{6,4} ({7,4})], [{8,8} ({9,8:00.00})], [{10,4} ({11,4})], [{12,8} ({13,8:00.00})], [{14,8} ({15,4}]", + ret = String.Format("[{0,4}/{1,4}], [{2,6}/{3,6}], [{4,4}/{5,4}], [{6,6} ({7,6})], [{8,4} ({9,4})]", + //lastTotalCount, totalAvCount, // TOTAL AVATARS + //lastLocalCount, syncedAvCount, // LOCAL TO THIS CLIENT VIEW + //lastRemoteCount, totalAvCount - syncedAvCount, // REMOTE (SHOULD = TOTAL - LOCAL) + msgsIn, (int)(msgsIn / secondsSinceLastStats), + bytesIn, 8 * (bytesIn / secondsSinceLastStats / 1000000), // IN + msgsOut, (int)(msgsOut / secondsSinceLastStats), + bytesOut, 8 * (bytesOut / secondsSinceLastStats / 1000000), // OUT + pollBlocks, (int)(pollBlocks / secondsSinceLastStats)); // NUMBER OF TIMES WE BLOCKED WRITING TO SOCKET + + msgsIn = msgsOut = bytesIn = bytesOut = pollBlocks = 0; + } + return ret; + } + + // Check if the client is connected + public bool Connected + { get { return m_tcpclient.Connected; } } + + //private int QuarkInfo.SizeX; + //private int QuarkInfo.SizeY; + //private List m_quarkSubscriptions; + Dictionary m_quarkSubscriptions; + public Dictionary QuarkSubscriptionList + { + get { return m_quarkSubscriptions; } + } + + + + #endregion + + // Constructor + public SceneToPhysEngineConnector(int num, Scene scene, TcpClient client, SceneToPhysEngineSyncServer syncServer) + { + m_connection_number = num; + m_scene = scene; + m_tcpclient = client; + m_addr = ((IPEndPoint)client.Client.RemoteEndPoint).Address; + m_port = ((IPEndPoint)client.Client.RemoteEndPoint).Port; + m_syncServer = syncServer; + + //QuarkInfo.SizeX = quarkSizeX; + //QuarkInfo.SizeY = quarkSizeY; + + m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + //m_log.WarnFormat("{0} Constructed", LogHeader); + + //Register for events from Scene.EventManager + //m_scene.EventManager.OnRezScript += SEConnectorOnRezScript; + //m_scene.EventManager.OnScriptReset += SEConnectorOnScriptReset; + //m_scene.EventManager.OnUpdateScript += SEConnectorOnUpdateScript; + + // Create a thread for the receive loop + m_receive_loop = new Thread(new ThreadStart(delegate() { ReceiveLoop(); })); + m_receive_loop.Name = Description; + //m_log.WarnFormat("{0} Started thread: {1}", LogHeader, m_receive_loop.Name); + m_receive_loop.Start(); + + //tell the remote script engine about the locX, locY of this authoritative scene + // SendSceneLoc(); + m_log.DebugFormat("{0}: SceneToPhysEngineConnector initialized", LogHeader); + } + + // Stop the listening thread, disconnecting the RegionSyncPhysEngine + public void Shutdown() + { + m_syncServer.RemoveSyncedPhysEngine(this); + // m_scene.EventManager.OnChatFromClient -= EventManager_OnChatFromClient; + // Abort ReceiveLoop Thread, close Socket and TcpClient + m_receive_loop.Abort(); + m_tcpclient.Client.Close(); + m_tcpclient.Close(); + + //m_scene.EventManager.OnRezScript -= SEConnectorOnRezScript; + //m_scene.EventManager.OnScriptReset -= SEConnectorOnScriptReset; + //m_scene.EventManager.OnUpdateScript -= SEConnectorOnUpdateScript; + } + + #region Send/Receive messages to/from the remote Physics Engine + + // Listen for messages from a RegionSyncClient + // *** This is the main thread loop for each connected client + private void ReceiveLoop() + { + //m_scene.EventManager.OnChatFromClient += new EventManager.ChatFromClientEvent(EventManager_OnChatFromClient); + + // Reset stats and time + lastStatTime = DateTime.Now; + msgsIn = msgsOut = bytesIn = bytesOut = 0; + + try + { + while (true) + { + RegionSyncMessage msg = GetMessage(); + lock (stats) + { + msgsIn++; + bytesIn += msg.Length; + } + lock (m_syncRoot) + HandleMessage(msg); + } + } + catch (Exception e) + { + m_log.WarnFormat("{0}: has disconnected: {1}", LogHeader, e.Message); + } + Shutdown(); + } + + // Get a message from the RegionSyncClient + private RegionSyncMessage GetMessage() + { + // Get a RegionSyncMessager from the incoming stream + RegionSyncMessage msg = new RegionSyncMessage(m_tcpclient.GetStream()); + //m_log.WarnFormat("{0} Received {1}", LogHeader, msg.ToString()); + return msg; + } + + // Handle an incoming message + // *** Perhaps this should not be synchronous with the receive + // We could handle messages from an incoming Queue + private void HandleMessage(RegionSyncMessage msg) + { + SceneToPhysEngineSyncServer.PhysLogMessage(true, msg); + msgCount++; + //string handlerMessage = ""; + switch (msg.Type) + { + case RegionSyncMessage.MsgType.ActorStop: + { + Shutdown(); + } + return; + /* + case RegionSyncMessage.MsgType.LoadBalanceRequest: + { + m_syncServer.HandleLoadBalanceRequest(this); + return; + } + + case RegionSyncMessage.MsgType.ActorStatus: + { + string status = Encoding.ASCII.GetString(msg.Data, 0, msg.Length); + ActorStatus actorStatus = (ActorStatus)Convert.ToInt32(status); + if (actorStatus == ActorStatus.Sync) + { + m_log.Debug(LogHeader + ": received ActorStatus " + actorStatus.ToString()); + m_syncServer.AddSyncedPhysEngine(this); + } + else + { + m_log.Warn(LogHeader + ": not supposed to received RegionSyncMessage.MsgType.ActorStatus==" + status.ToString()); + } + return; + } + */ + + case RegionSyncMessage.MsgType.PhysTerseUpdate: + { + HandlePhysTerseUpdate(msg); + return; + } + case RegionSyncMessage.MsgType.PhysOutOfBounds: + { + HandlePhysOutOfBounds(msg); + return; + } + case RegionSyncMessage.MsgType.PhysCollisionUpdate: + { + HandlePhysCollisionUpdate(msg); + return; + } + case RegionSyncMessage.MsgType.PhysUpdateAttributes: + { + HandlePhysUpdateAttributes(msg); + return; + } + default: + { + m_log.WarnFormat("{0} Unable to handle unsupported message type", LogHeader); + return; + } + } + } + + private void HandlePhysTerseUpdate(RegionSyncMessage msg) + { + OSDMap data = RegionSyncUtil.DeserializeMessage(msg, LogHeader); + try + { + UUID uuid = data["uuid"].AsUUID(); + // m_log.DebugFormat("{0}: received PhysUpdateAttributes for {1}", LogHeader, uuid); + PhysicsActor pa = FindPhysicsActor(uuid); + if (pa != null) + { + pa.RequestPhysicsterseUpdate(); + } + else + { + m_log.WarnFormat("{0}: terse update for unknown uuid {1}", LogHeader, uuid); + return; + } + } + catch (Exception e) + { + m_log.WarnFormat("{0}: EXCEPTION processing PhysTerseUpdate: {1}", LogHeader, e); + return; + } + return; + } + + private void HandlePhysOutOfBounds(RegionSyncMessage msg) + { + // TODO: + return; + } + + private void HandlePhysCollisionUpdate(RegionSyncMessage msg) + { + // TODO: + return; + } + + /// + /// The physics engine has some updates to the attributes. Unpack the parameters, find the + /// correct PhysicsActor and plug in the new values; + /// + /// + private void HandlePhysUpdateAttributes(RegionSyncMessage msg) + { + OSDMap data = RegionSyncUtil.DeserializeMessage(msg, LogHeader); + try + { + UUID uuid = data["uuid"].AsUUID(); + string actorID = data["actorID"].AsString(); + // m_log.DebugFormat("{0}: received PhysUpdateAttributes for {1}", LogHeader, uuid); + PhysicsActor pa = FindPhysicsActor(uuid); + if (pa != null) + { + pa.Size = data["size"].AsVector3(); + pa.Position = data["position"].AsVector3(); + pa.Force = data["force"].AsVector3(); + pa.Velocity = data["velocity"].AsVector3(); + pa.RotationalVelocity = data["rotationalVelocity"].AsVector3(); + pa.Acceleration = data["acceleration"].AsVector3(); + pa.Torque = data["torque"].AsVector3(); + pa.Orientation = data["orientation"].AsQuaternion(); + pa.IsPhysical = data["isPhysical"].AsBoolean(); // receive?? + pa.Flying = data["flying"].AsBoolean(); // receive?? + pa.Kinematic = data["kinematic"].AsBoolean(); // receive?? + pa.Buoyancy = (float)(data["buoyancy"].AsReal()); + pa.CollidingGround = data["isCollidingGround"].AsBoolean(); + pa.IsColliding = data["isCollidingGround"].AsBoolean(); + pa.ChangingActorID = actorID; + + pa.RequestPhysicsterseUpdate(); // tell the system the values have changed + } + else + { + m_log.WarnFormat("{0}: attribute update for unknown uuid {1}", LogHeader, uuid); + return; + } + } + catch (Exception e) + { + m_log.WarnFormat("{0}: EXCEPTION processing UpdateAttributes: {1}", LogHeader, e); + return; + } + return; + } + + // Find the physics actor whether it is an object or a scene presence + private PhysicsActor FindPhysicsActor(UUID uuid) + { + SceneObjectPart sop = m_scene.GetSceneObjectPart(uuid); + if (sop != null) + { + return sop.PhysActor; + } + ScenePresence sp = m_scene.GetScenePresence(uuid); + if (sp != null) + { + return sp.PhysicsActor; + } + return null; + } + + public void SendPhysUpdateAttributes(PhysicsActor pa) + { + // m_log.DebugFormat("{0}: sending PhysUpdateAttributes for {1}", LogHeader, pa.UUID); + OSDMap data = new OSDMap(15); + data["time"] = OSD.FromString(DateTime.Now.ToString("yyyyMMddHHmmssfff")); + data["localID"] = OSD.FromUInteger(pa.LocalID); + data["uuid"] = OSD.FromUUID(pa.UUID); + data["actorID"] = OSD.FromString(RegionSyncServerModule.ActorID); + data["size"] = OSD.FromVector3(pa.Size); + data["position"] = OSD.FromVector3(pa.Position); + data["force"] = OSD.FromVector3(pa.Force); + data["velocity"] = OSD.FromVector3(pa.Velocity); + data["rotationalVelocity"] = OSD.FromVector3(pa.RotationalVelocity); + data["acceleration"] = OSD.FromVector3(pa.Acceleration); + data["torque"] = OSD.FromVector3(pa.Torque); + data["orientation"] = OSD.FromQuaternion(pa.Orientation); + data["isPhysical"] = OSD.FromBoolean(pa.IsPhysical); + data["flying"] = OSD.FromBoolean(pa.Flying); + data["buoyancy"] = OSD.FromReal(pa.Buoyancy); + // data["isColliding"] = OSD.FromBoolean(pa.IsColliding); + // data["isCollidingGround"] = OSD.FromBoolean(pa.CollidingGround); + + RegionSyncMessage rsm = new RegionSyncMessage(RegionSyncMessage.MsgType.PhysUpdateAttributes, + OSDParser.SerializeJsonString(data)); + Send(rsm); + return; + } + + //For simplicity, we assume the subscription sent by PhysEngine is legistimate (no overlapping with other script engines, etc) + private void HandleQuarkSubscription(RegionSyncMessage msg) + { + string quarkString = Encoding.ASCII.GetString(msg.Data, 0, msg.Length); + m_log.Debug(LogHeader + ": received quark-string: " + quarkString); + + List quarkStringList = RegionSyncUtil.QuarkStringToStringList(quarkString); + //m_quarkSubscriptions = RegionSyncUtil.GetQuarkInfoList(quarkStringList, QuarkInfo.SizeX, QuarkInfo.SizeY); + List quarkList = RegionSyncUtil.GetQuarkInfoList(quarkStringList); + m_syncServer.RegisterQuarkSubscription(quarkList, this); + + m_quarkSubscriptions = new Dictionary(); + foreach (QuarkInfo quark in quarkList) + { + m_quarkSubscriptions.Add(quark.QuarkStringRepresentation, quark); + } + } + + private RegionSyncMessage PrepareObjectUpdateMessage(RegionSyncMessage.MsgType msgType, SceneObjectGroup sog) + { + OSDMap data = new OSDMap(3); + data["locX"] = OSD.FromUInteger(m_scene.RegionInfo.RegionLocX); + data["locY"] = OSD.FromUInteger(m_scene.RegionInfo.RegionLocY); + string sogxml = SceneObjectSerializer.ToXml2Format(sog); + data["sogXml"] = OSD.FromString(sogxml); + + RegionSyncMessage rsm = new RegionSyncMessage(msgType, OSDParser.SerializeJsonString(data)); + return rsm; + } + + private void SendSceneLoc() + { + uint locX = m_scene.RegionInfo.RegionLocX; + uint locY = m_scene.RegionInfo.RegionLocY; + + OSDMap data = new OSDMap(2); + data["locX"] = OSD.FromUInteger(locX); + data["locY"] = OSD.FromUInteger(locY); + Send(new RegionSyncMessage(RegionSyncMessage.MsgType.SceneLocation, OSDParser.SerializeJsonString(data))); + } + + public void Send(RegionSyncMessage msg) + { + if (msg.Type == RegionSyncMessage.MsgType.AvatarAppearance) + m_log.WarnFormat("{0} Sending AvatarAppearance to client manager", LogHeader); + + Send(msg.ToBytes()); + } + + private void Send(byte[] data) + { + if (m_tcpclient.Connected) + { + try + { + lock (stats) + { + msgsOut++; + bytesOut += data.Length; + } + m_tcpclient.GetStream().BeginWrite(data, 0, data.Length, ar => + { + if (m_tcpclient.Connected) + { + try + { + m_tcpclient.GetStream().EndWrite(ar); + } + catch (Exception) + { + m_log.WarnFormat("{0} Write to output stream failed", LogHeader); + } + } + }, null); + } + catch (IOException) + { + m_log.WarnFormat("{0} Physics Engine has disconnected.", LogHeader); + } + } + else + { + m_log.DebugFormat("{0} Attempt to send with no connection", LogHeader); + } + + } + + public void SendObjectUpdate(RegionSyncMessage.MsgType msgType, SceneObjectGroup sog) + { + Send(PrepareObjectUpdateMessage(msgType, sog)); + } + + #endregion Send/Receive messages to/from the remote Physics Engine + + #region Spacial query functions (should be eventually implemented within Scene) + /* + //This should be a function of Scene, but since we don't have the quark concept in Scene yet, + //for now we implement it here. + //Ideally, for quark based space representation, the Scene has a list of quarks, and each quark points + //to a list of objects within that quark. Then it's much easier to return the right set of objects within + //a certain space. (Or use DB that supports spatial queries.) + List GetObjectsInGivenSpace(Scene scene, Dictionary quarkSubscriptions) + { + List entities = m_scene.GetEntities(); + List sogList = new List(); + foreach (EntityBase e in entities) + { + if (e is SceneObjectGroup) + { + SceneObjectGroup sog = (SceneObjectGroup)e; + string quarkID = RegionSyncUtil.GetQuarkIDByPosition(sog.AbsolutePosition); + if (m_quarkSubscriptions.ContainsKey(quarkID)) + { + sogList.Add(sog); + } + } + } + + return sogList; + } + */ + + #endregion + + #region Load balancing functions + /* + public void SendLoadBalanceRejection(string response) + { + RegionSyncMessage msg = new RegionSyncMessage(RegionSyncMessage.MsgType.LoadBalanceRejection, response); + Send(msg); + } + */ + #endregion + } +} diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineSyncServer.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineSyncServer.cs new file mode 100644 index 0000000000..a4cbb96c81 --- /dev/null +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SceneToPhysEngineSyncServer.cs @@ -0,0 +1,765 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Collections.Generic; +using System.Threading; +using OpenSim.Framework; +using OpenSim.Region.CoreModules.Framework.InterfaceCommander; +using OpenSim.Region.Framework.Scenes; +using OpenSim.Region.Framework.Interfaces; +using log4net; + +using OpenMetaverse; +using OpenMetaverse.StructuredData; +using OpenSim.Region.Framework.Scenes.Serialization; +using OpenSim.Region.Physics.Manager; + +namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule +{ + //Information of a registered idle physics engine. + //Note, this is a temporary solution to inlcude idle physics engines here. + //In the future, there might be a independent load balaner that keeps track + //of available idle hardware. + public class IdlePhysEngineInfo + { + public TcpClient TClient; + //public IPAddress PhysEngineIPAddr; + //public int PhysEnginePort; + public string ID; + + //Will be used to store the overloaded PE that has send LB request and paired with this idle PE + public SceneToPhysEngineConnector AwaitOverloadedSE=null; + + public IdlePhysEngineInfo(TcpClient tclient) + { + if(tclient==null) return; + TClient = tclient; + IPAddress ipAddr = ((IPEndPoint)tclient.Client.RemoteEndPoint).Address; + int port = ((IPEndPoint)tclient.Client.RemoteEndPoint).Port; + ID = ipAddr.ToString()+":"+port; + } + } + + //Here is the per actor type listening server for physics Engines. + public class SceneToPhysEngineSyncServer : ISceneToPhysEngineServer, ICommandableModule + { + #region SceneToPhysEngineSyncServer members + // Set the addr and port for TcpListener + private IPAddress m_addr; + private Int32 m_port; + + //this field is only meaning for the QuarkInfo records on the Scene side + private SceneToPhysEngineConnector m_peConnector=null; + public SceneToPhysEngineConnector PEConnector + { + get { return m_peConnector; } + set { m_peConnector = value; } + } + + private int peCounter; + + // static counters that are used to compute global configuration state + private static int m_syncServerInitialized = 0; + private static int m_totalConnections = 0; + private static List m_allScenes = new List(); + + // The local scene. + private Scene m_scene; + + private ILog m_log; + + // The listener and the thread which listens for connections from client managers + private TcpListener m_listener; + private Thread m_listenerThread; + + private object m_physEngineConnector_lock = new object(); + //private Dictionary m_physEngineConnectors = new Dictionary(); + private List m_physEngineConnectors = new List(); + // the last connector created + private SceneToPhysEngineConnector m_sceneToPhysEngineConnector = null; + + //list of idle physics engines that have registered. + private List m_idlePhysEngineList = new List(); + + //List of all quarks, each using the concatenation of x,y values of its left-bottom corners, + // where the x,y values are the offset position in the scene. + private Dictionary m_quarksInScene = new Dictionary(); + + private string LogHeader = "[SCENE TO PHYS ENGINE SYNC SERVER]"; + + //Quark related info + //private int QuarkInfo.SizeX; + //private int QuarkInfo.SizeY; + + #region ICommandableModule Members + private readonly Commander m_commander = new Commander("phys"); + public ICommander CommandInterface + { + get { return m_commander; } + } + + private void InstallInterfaces() + { + // Command cmdSyncStart = new Command("start", CommandIntentions.COMMAND_HAZARDOUS, SyncStart, "Begins synchronization with RegionSyncServer."); + //cmdSyncStart.AddArgument("server_port", "The port of the server to synchronize with", "Integer"); + + // Command cmdSyncStop = new Command("stop", CommandIntentions.COMMAND_HAZARDOUS, SyncStop, "Stops synchronization with RegionSyncServer."); + //cmdSyncStop.AddArgument("server_address", "The IP address of the server to synchronize with", "String"); + //cmdSyncStop.AddArgument("server_port", "The port of the server to synchronize with", "Integer"); + + Command cmdSyncStatus = new Command("status", CommandIntentions.COMMAND_HAZARDOUS, SyncStatus, "Displays synchronization status."); + + //The following two commands are more for easier debugging purpose + // Command cmdSyncSetQuarks = new Command("quarkSpace", CommandIntentions.COMMAND_HAZARDOUS, SetQuarkList, "Set the set of quarks to subscribe to. For debugging purpose. Should be issued before \"sync start\""); + // cmdSyncSetQuarks.AddArgument("quarkSpace", "The (rectangle) space of quarks to subscribe, represented by x0_y0,x1_y1, the left-bottom and top-right corners of the rectangel space", "String"); + + // Command cmdSyncSetQuarkSize = new Command("quarksize", CommandIntentions.COMMAND_HAZARDOUS, SetQuarkSize, "Set the size of each quark. For debugging purpose. Should be issued before \"sync quarks\""); + // cmdSyncSetQuarkSize.AddArgument("quarksizeX", "The size on x axis of each quark", "Integer"); + // cmdSyncSetQuarkSize.AddArgument("quarksizeY", "The size on y axis of each quark", "Integer"); + + // m_commander.RegisterCommand("start", cmdSyncStart); + // m_commander.RegisterCommand("stop", cmdSyncStop); + m_commander.RegisterCommand("status", cmdSyncStatus); + // m_commander.RegisterCommand("quarkSpace", cmdSyncSetQuarks); + + lock (m_scene) + { + // Add this to our scene so scripts can call these functions + m_scene.RegisterModuleCommander(m_commander); + } + } + + /// + /// Processes commandline input. Do not call directly. + /// + /// Commandline arguments + private void EventManager_OnPluginConsole(string[] args) + { + if (args[0] == "phys") + { + if (args.Length == 1) + { + m_commander.ProcessConsoleCommand("help", new string[0]); + return; + } + + string[] tmpArgs = new string[args.Length - 2]; + int i; + for (i = 2; i < args.Length; i++) + tmpArgs[i - 2] = args[i]; + + m_commander.ProcessConsoleCommand(args[1], tmpArgs); + } + } + + private void SyncStart(Object[] args) + { + return; + } + private void SyncStop(Object[] args) + { + return; + } + private void SyncStatus(Object[] args) + { + lock (m_physEngineConnector_lock) + { + if (m_physEngineConnectors.Count == 0) + { + m_log.Warn(LogHeader + " Not currently synchronized"); + return; + } + m_log.Warn(LogHeader + " Synchronized"); + foreach (SceneToPhysEngineConnector pec in m_physEngineConnectors) + { + m_log.Warn(pec.GetStats()); + } + } + } + + #endregion + + // Check if any of the client views are in a connected state + public bool IsPhysEngineScene() { return SceneToPhysEngineSyncServer.IsPhysEngineScene2S(); } + public bool IsActivePhysEngineScene() { return SceneToPhysEngineSyncServer.IsActivePhysEngineScene2S(); } + public bool IsPhysEngineActor() { return SceneToPhysEngineSyncServer.IsPhysEngineActorS; } + + public bool Synced + { + get { return (m_physEngineConnectors.Count > 0); } + } + public static bool IsPhysEngineSceneS + { + get { return (SceneToPhysEngineSyncServer.m_syncServerInitialized > 0); } + } + public static bool IsPhysEngineScene2S() + { + return (SceneToPhysEngineSyncServer.m_syncServerInitialized > 0); + } + public static bool IsActivePhysEngineSceneS + { + get { + System.Console.WriteLine("IsActivePhysEngineScene: si={0} tc={1}", + SceneToPhysEngineSyncServer.m_syncServerInitialized, + SceneToPhysEngineSyncServer.m_totalConnections); + return (SceneToPhysEngineSyncServer.m_syncServerInitialized > 0 + && SceneToPhysEngineSyncServer.m_totalConnections > 0); + } + } + public static bool IsActivePhysEngineScene2S() + { + return (SceneToPhysEngineSyncServer.m_syncServerInitialized > 0 + && SceneToPhysEngineSyncServer.m_totalConnections > 0); + } + public static bool IsPhysEngineActorS + { + get { return PhysEngineToSceneConnectorModule.IsPhysEngineActorS; } + } + + /// + /// The scene is unknown by ODE so we have to look through the scenes to + /// find the one with this PhysicsActor so we can send the update. + /// + /// + public static void RouteUpdate(PhysicsActor pa) + { + SceneObjectPart sop = null; + Scene s = null; + foreach (Scene ss in m_allScenes) + { + try + { + sop = ss.GetSceneObjectPart(pa.UUID); + } + catch + { + sop = null; + } + if (sop != null) + { + s = ss; + break; + } + else + { + ScenePresence sp = ss.GetScenePresence(pa.UUID); + if (sp != null) + { + s = ss; + break; + } + } + } + if (s != null) + { + if (s.SceneToPhysEngineSyncServer != null) + { + s.SceneToPhysEngineSyncServer.SendUpdate(pa); + } + else + { + Console.WriteLine("RouteUpdate: SceneToPhysEngineSyncServer is not available"); + } + } + else + { + Console.WriteLine("RouteUpdate: no SOP for update of {0}", pa.UUID); + } + return; + } + + public void SendUpdate(PhysicsActor pa) + { + // m_log.DebugFormat("{0}: SendUpdate for {1}", LogHeader, pa.LocalID); + if (m_sceneToPhysEngineConnector != null) + { + this.m_sceneToPhysEngineConnector.SendPhysUpdateAttributes(pa); + } + } + + #endregion + + // Constructor + public SceneToPhysEngineSyncServer(Scene scene, string addr, int port) + { + if (QuarkInfo.SizeX == -1 || QuarkInfo.SizeY == -1) + { + m_log.Error(LogHeader + " QuarkInfo.SizeX or QuarkInfo.SizeY has not been configured yet."); + Environment.Exit(0); ; + } + + m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + //m_log.Warn(LogHeader + "Constructed"); + m_scene = scene; + m_addr = IPAddress.Parse(addr); + m_port = port; + + m_scene.RegisterModuleInterface(this); + + // remember all the scenes that are configured for connection to physics engine + if (!m_allScenes.Contains(m_scene)) + { + m_allScenes.Add(m_scene); + } + + InitQuarksInScene(); + SubscribeToEvents(); + m_scene.EventManager.OnPluginConsole += EventManager_OnPluginConsole; + InstallInterfaces(); + } + + + private void SubscribeToEvents() + { + } + + private void UnSubscribeToEvents() + { + } + + // Start the server + public void Start() + { + SceneToPhysEngineSyncServer.m_syncServerInitialized++; + m_listenerThread = new Thread(new ThreadStart(Listen)); + m_listenerThread.Name = "SceneToPhysEngineSyncServer Listener"; + m_log.DebugFormat("{0}: Starting {1} thread", LogHeader, m_listenerThread.Name); + m_listenerThread.Start(); + // m_log.DebugFormat("{0}: Started", LogHeader); + } + + + + // Stop the server and disconnect all RegionSyncClients + public void Shutdown() + { + m_log.DebugFormat("{0}: Shutdown", LogHeader); + SceneToPhysEngineSyncServer.m_syncServerInitialized--; + // Stop the listener and listening thread so no new clients are accepted + m_listener.Stop(); + m_listenerThread.Abort(); + m_listenerThread = null; + + // Stop all existing SceneTOSEConnectors + //TO FINISH + foreach (SceneToPhysEngineConnector peConnector in m_physEngineConnectors) + { + peConnector.Shutdown(); + } + m_physEngineConnectors.Clear(); + + UnSubscribeToEvents(); + } + + private void InitQuarksInScene() + { + List quarkList = RegionSyncUtil.GetAllQuarksInScene(); + foreach (QuarkInfo quark in quarkList) + { + m_quarksInScene.Add(quark.QuarkStringRepresentation, quark); + } + } + + public void RegisterQuarkSubscription(List quarkSubscriptions, SceneToPhysEngineConnector peConnector) + { + foreach (QuarkInfo quark in quarkSubscriptions) + { + string quarkID = quark.QuarkStringRepresentation; + // TODO: does the physics engine connect to quarks. Next line commented out. + // m_quarksInScene[quarkID].PEConnector = peConnector; + m_log.Debug(LogHeader + ": " + quarkID + " subscribed by "+peConnector.Description); + } + } + + // Add a connector to a physics engine + public void AddSyncedPhysEngine(SceneToPhysEngineConnector peConnector) + { + lock (m_physEngineConnector_lock) + { + m_physEngineConnectors.Add(peConnector); + m_sceneToPhysEngineConnector = peConnector; + } + } + + // Remove the client view from the list and decrement synced client counter + public void RemoveSyncedPhysEngine(SceneToPhysEngineConnector peConnector) + { + lock (m_physEngineConnector_lock) + { + //Dictionary currentlist = m_physEngineConnectors; + //Dictionary newlist = new Dictionary(currentlist); + m_physEngineConnectors.Remove(peConnector); + // Threads holding the previous version of the list can keep using it since + // they will not hold it for long and get a new copy next time they need to iterate + //m_physEngineConnectors = newlist; + } + } + + // Listen for connections from a new RegionSyncClient + // When connected, start the ReceiveLoop for the new client + private void Listen() + { + m_listener = new TcpListener(m_addr, m_port); + + try + { + // Start listening for clients + m_listener.Start(); + while (true) + { + // *** Move/Add TRY/CATCH to here, but we don't want to spin loop on the same error + m_log.WarnFormat(LogHeader + ": Listening for new connections on port {0}...", m_port.ToString()); + TcpClient tcpclient = m_listener.AcceptTcpClient(); + IPAddress addr = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Address; + int port = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Port; + SceneToPhysEngineSyncServer.m_totalConnections++; + // m_log.DebugFormat("{0}: m_totalConnections = {1}", LogHeader, SceneToPhysEngineSyncServer.m_totalConnections); + + ActorStatus actorStatus = GetActorStatus(tcpclient); + + switch (actorStatus) + { + case ActorStatus.Sync: + // Add the SceneToPhysEngineConnector to the list + SceneToPhysEngineConnector sceneToPEConnector = new SceneToPhysEngineConnector(++peCounter, m_scene, tcpclient, this); + AddSyncedPhysEngine(sceneToPEConnector); + break; + case ActorStatus.Idle: + IdlePhysEngineInfo idleSE = new IdlePhysEngineInfo(tcpclient); + m_log.DebugFormat("{0}: adding an idle SE ({1}:{2})", LogHeader, addr, port); + m_idlePhysEngineList.Add(idleSE); + break; + default: + m_log.DebugFormat("{0}: Unknown actor status", LogHeader); + break; + } + + } + } + catch (SocketException e) + { + m_log.WarnFormat("{0}: [Listen] SocketException: {1}", LogHeader, e); + } + } + + /* + public void RegisterSyncedPhysEngine(SceneToPhysEngineConnector sceneToSEConnector) + { + //first, remove it from the idle list + m_idlePhysEngineList.Remove(sceneToSEConnector); + + //now, added to the synced SE list + AddSyncedPhysEngine(sceneToSEConnector); + } + * */ + + + // Broadcast a message to all connected RegionSyncClients + public void SendToAllConnectedPE(RegionSyncMessage msg) + { + if (m_physEngineConnectors.Count > 0) + { + m_log.Debug(LogHeader + ": region " + m_scene.RegionInfo.RegionName + " Broadcast to PhysEngine, msg " + msg.Type); + foreach (SceneToPhysEngineConnector peConnector in m_physEngineConnectors) + { + peConnector.Send(msg); + } + } + + } + + //TO FINISH: Find the right SceneToSEConnector to forward the message + public void SendToPE(RegionSyncMessage.MsgType msgType, SceneObjectGroup sog) + { + SceneToPhysEngineConnector peConnector = GetSceneToPEConnector(sog); + if (peConnector != null) + { + peConnector.SendObjectUpdate(msgType, sog); + } + } + + //This is to send a message, rsm, to phys engine, and the message is about object SOG. E.g. RemovedObject + public void SendToPE(RegionSyncMessage rsm, SceneObjectGroup sog) + { + SceneToPhysEngineConnector peConnector = GetSceneToPEConnector(sog); + if (peConnector != null) + { + peConnector.Send(rsm); + } + } + + + private SceneToPhysEngineConnector GetSceneToPEConnector(SceneObjectGroup sog) + { + if (m_physEngineConnectors.Count == 0) + return null; + if (sog == null) + { + return m_physEngineConnectors[0]; + } + else + { + //Find the right SceneToSEConnector by the object's position + //TO FINISH: Map the object to a quark first, then map the quark to SceneToSEConnector + string quarkID = RegionSyncUtil.GetQuarkIDByPosition(sog.AbsolutePosition); + // TODO: connection of physics engine to quarks. Next line commented out + // SceneToPhysEngineConnector peConnector = m_quarksInScene[quarkID].PEConnector; + + if (PEConnector == null) + { + m_log.Warn(LogHeader + sog.AbsolutePosition.ToString() + " not covered by any physics engine"); + } + + return PEConnector; + } + + } + + private ActorStatus GetActorStatus(TcpClient tcpclient) + { + m_log.Debug(LogHeader+ ": Get Actor status"); + + RegionSyncMessage msg = new RegionSyncMessage(tcpclient.GetStream()); + ActorStatus actorStatus; + switch (msg.Type) + { + case RegionSyncMessage.MsgType.ActorStatus: + { + string status = Encoding.ASCII.GetString(msg.Data, 0, msg.Length); + m_log.Debug(LogHeader + ": recv status: " + status); + actorStatus = (ActorStatus)Convert.ToInt32(status); + break; + } + default: + { + m_log.Error(LogHeader + ": Expect Message Type: ActorStatus"); + RegionSyncMessage.HandleError("[REGION SYNC SERVER]", msg, String.Format("{0} Expect Message Type: ActorType", "[REGION SYNC SERVER]")); + return ActorStatus.Null; + } + } + return actorStatus; + } + + + #region Event Handlers + + #endregion Event Handlers + + #region Load balancing members and functions + /* + //keep track of idle physics engines that are in the process of load balancing (they are off the idle list, but not a working physics engine yet (not sync'ing with Scene yet)). + private Dictionary m_loadBalancingIdleSEs = new Dictionary(); + public void HandleLoadBalanceRequest(SceneToPhysEngineConnector seConnctor) + { + //Let's start a thread to do the job, so that we can return quickly and don't block on ReceiveLoop() + + Thread partitionThread = new Thread(new ParameterizedThreadStart(TriggerLoadBalanceProcess)); + partitionThread.Name = "TriggerLoadBalanceProcess"; + partitionThread.Start((object)seConnctor); + } + + public void TriggerLoadBalanceProcess(object arg) + { + SceneToPhysEngineConnector seConnctor = (SceneToPhysEngineConnector)arg; + IdlePhysEngineInfo idlePhysEngineInfo = GetIdlePhysEngineConnector(); + if (idlePhysEngineInfo != null) + { + RegionSyncMessage msg = new RegionSyncMessage(RegionSyncMessage.MsgType.LoadMigrationNotice); + Send(idlePhysEngineInfo.TClient, msg.ToBytes()); + m_log.Debug(LogHeader + ": HandleLoadBalanceRequest from " + seConnctor.Description + ", picked idle SE: " + idlePhysEngineInfo.ID); + + //keep track of which overload physics engine is paired up with which idle physics engine + idlePhysEngineInfo.AwaitOverloadedSE = seConnctor; + m_loadBalancingIdleSEs.Add(idlePhysEngineInfo.ID, idlePhysEngineInfo); + + m_log.Debug("ToSEConnector portal: local -" + + ((IPEndPoint)idlePhysEngineInfo.TClient.Client.LocalEndPoint).Address.ToString() + ":" + ((IPEndPoint)idlePhysEngineInfo.TClient.Client.LocalEndPoint).Port + + "; remote - " + ((IPEndPoint)idlePhysEngineInfo.TClient.Client.RemoteEndPoint).Address.ToString() + ":" + + ((IPEndPoint)idlePhysEngineInfo.TClient.Client.RemoteEndPoint).Port); + + //Now we expect the idle physics engine to reply back + msg = new RegionSyncMessage(idlePhysEngineInfo.TClient.GetStream()); + if (msg.Type != RegionSyncMessage.MsgType.LoadMigrationListenerInitiated) + { + m_log.Warn(LogHeader + ": should receive a message of type LoadMigrationListenerInitiated, but received " + msg.Type.ToString()); + } + else + { + //Before the load is migrated from overloaded physics engine to the idle engine, sync with the DB to update the state in DB + List entities = m_scene.GetEntities(); + foreach (EntityBase entity in entities) + { + if (!entity.IsDeleted && entity is SceneObjectGroup && ((SceneObjectGroup)entity).HasGroupChanged) + { + m_scene.ForceSceneObjectBackup((SceneObjectGroup)entity); + } + } + + OSDMap data = DeserializeMessage(msg); + if (!data.ContainsKey("ip") || !data.ContainsKey("port") ) + { + m_log.Warn(LogHeader + ": parameters missing in SceneLocation message from Scene, need to have ip, port"); + return; + } + //echo the information back to the overloaded physics engine + seConnctor.Send(new RegionSyncMessage(RegionSyncMessage.MsgType.LoadBalanceResponse, OSDParser.SerializeJsonString(data))); + + m_log.Debug(LogHeader + " now remove physics engine " + idlePhysEngineInfo.ID + " from idle SE list, and create SceneToPhysEngineConnector to it"); + //create a SceneToSEConnector for the idle physics engine, who will be sync'ing with this SyncServer soon + SceneToPhysEngineConnector sceneToSEConnector = new SceneToPhysEngineConnector(++peCounter, m_scene, idlePhysEngineInfo.TClient, this); + //Now remove the physics engine from the idle SE list + m_idlePhysEngineList.Remove(idlePhysEngineInfo); + //AddSyncedPhysEngine(sceneToSEConnector); + } + + } + else + { + seConnctor.SendLoadBalanceRejection("no idle physics engines"); + } + } + */ + + HashSet exceptions = new HashSet(); + private OSDMap DeserializeMessage(RegionSyncMessage msg) + { + OSDMap data = null; + try + { + data = OSDParser.DeserializeJson(Encoding.ASCII.GetString(msg.Data, 0, msg.Length)) as OSDMap; + } + catch (Exception e) + { + lock (exceptions) + // If this is a new message, then print the underlying data that caused it + if (!exceptions.Contains(e.Message)) + m_log.Error(LogHeader + " " + Encoding.ASCII.GetString(msg.Data, 0, msg.Length)); + data = null; + } + return data; + } + + private void Send(TcpClient tcpclient, byte[] data) + { + if (tcpclient.Connected) + { + try + { + tcpclient.GetStream().BeginWrite(data, 0, data.Length, ar => + { + if (tcpclient.Connected) + { + try + { + tcpclient.GetStream().EndWrite(ar); + } + catch (Exception) + { } + } + }, null); + } + catch (IOException) + { + m_log.WarnFormat("{0} physics Engine has disconnected.", LogHeader); + } + } + } + + private IdlePhysEngineInfo GetIdlePhysEngineConnector() + { + if (m_idlePhysEngineList.Count == 0) + return null; + IdlePhysEngineInfo idleSEInfo = m_idlePhysEngineList[0]; + m_idlePhysEngineList.Remove(idleSEInfo); + return idleSEInfo; + } + + #endregion Load balancing functions + + #region Message Logging + public static bool logInput = false; + public static bool logOutput = true; + public static bool logEnabled = true; + private class PhysMsgLogger + { + public DateTime startTime; + public string path = null; + public System.IO.TextWriter Log = null; + } + private static PhysMsgLogger logWriter = null; + private static TimeSpan logMaxFileTime = new TimeSpan(0, 5, 0); // (h,m,s) => 5 minutes + public static string logDir = "/stats/stats"; + private static object logLocker = new Object(); + + public static void PhysLogMessage(bool direction, RegionSyncMessage rsm) + { + if (!logEnabled) return; // save to work of the ToStringFull if not enabled + PhysLogMessage(direction, rsm.ToStringFull()); + } + + /// + /// Log a physics bucket message + /// + /// True of message originated from the agent + /// the message to log + public static void PhysLogMessage(bool direction, string msg) + { + if (!logEnabled) return; + + lock (logLocker) + { + try + { + DateTime now = DateTime.Now; + if (logWriter == null || (now > logWriter.startTime + logMaxFileTime)) + { + if (logWriter != null && logWriter.Log != null) + { + logWriter.Log.Close(); + logWriter.Log.Dispose(); + logWriter.Log = null; + } + + // First log file or time has expired, start writing to a new log file + logWriter = new PhysMsgLogger(); + logWriter.startTime = now; + logWriter.path = (logDir.Length > 0 ? logDir + System.IO.Path.DirectorySeparatorChar.ToString() : "") + + String.Format("physics-{0}.log", now.ToString("yyyyMMddHHmmss")); + logWriter.Log = new StreamWriter(File.Open(logWriter.path, FileMode.Append, FileAccess.Write)); + } + if (logWriter != null && logWriter.Log != null) + { + StringBuilder buff = new StringBuilder(); + buff.Append(now.ToString("yyyyMMddHHmmssfff")); + buff.Append(" "); + buff.Append(direction ? "A->S:" : "S->A:"); + buff.Append(msg); + buff.Append("\r\n"); + logWriter.Log.Write(buff.ToString()); + } + } + catch (Exception e) + { + // m_log.ErrorFormat("{0}: FAILURE WRITING TO LOGFILE: {1}", LogHeader, e); + logEnabled = false; + } + } + return; + } + + public static void PhysLogMessageClose() + { + if (logWriter != null && logWriter.Log != null) + { + logWriter.Log.Close(); + logWriter.Log.Dispose(); + logWriter.Log = null; + logWriter = null; + } + logEnabled = false; + } + #endregion Message Logging + } +}