diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/RegionSyncModule.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/RegionSyncModule.cs index b24e4db3e7..d4fc648788 100755 --- a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/RegionSyncModule.cs +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/RegionSyncModule.cs @@ -12,10 +12,12 @@ using OpenSim.Framework.Client; using OpenSim.Region.CoreModules.Framework.InterfaceCommander; using OpenSim.Region.Framework.Interfaces; using OpenSim.Region.Framework.Scenes; +using OpenSim.Region.Framework.Scenes.Serialization; using log4net; using System.Net; using System.Net.Sockets; using System.Threading; +using System.Text; using Mono.Addins; @@ -240,6 +242,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule } private IConfig m_sysConfig = null; + private string LogHeader = "[REGION SYNC MODULE]"; //The list of SyncConnectors. ScenePersistence could have multiple SyncConnectors, each connecting to a differerent actor. //An actor could have several SyncConnectors as well, each connecting to a ScenePersistence that hosts a portion of the objects/avatars @@ -313,6 +316,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule { GetRemoteSyncListenerInfo(); StartSyncConnections(); + DoInitialSync(); } } @@ -344,21 +348,21 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule { foreach (RegionSyncListenerInfo remoteListener in m_remoteSyncListeners) { - SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, remoteListener); - if (syncConnector.Start()) + SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, remoteListener, this); + if (syncConnector.Connect()) { + syncConnector.StartCommThreads(); AddSyncConnector(syncConnector); } } } - - public void AddSyncConnector(TcpClient tcpclient) + //To be called when a SyncConnector needs to be created by that the local listener receives a connection request + public void AddNewSyncConnector(TcpClient tcpclient) { - //IPAddress addr = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Address; - //int port = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Port; - - SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, tcpclient); + //Create a SynConnector due to an incoming request, and starts its communication threads + SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, tcpclient, this); + syncConnector.StartCommThreads(); AddSyncConnector(syncConnector); } @@ -402,13 +406,139 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule { foreach (SyncConnector syncConnector in m_syncConnectors) { - syncConnector.Stop(); + syncConnector.Shutdown(); } m_syncConnectors.Clear(); } } + private void DoInitialSync() + { + m_scene.DeleteAllSceneObjects(); + + SendSyncMessage(SymmetricSyncMessage.MsgType.RegionName, m_scene.RegionInfo.RegionName); + m_log.WarnFormat("Sending region name: \"{0}\"", m_scene.RegionInfo.RegionName); + + SendSyncMessage(SymmetricSyncMessage.MsgType.GetTerrain); + //Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetObjects)); + //Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetAvatars)); + + //We'll deal with Event a bit later + + // Register for events which will be forwarded to authoritative scene + // m_scene.EventManager.OnNewClient += EventManager_OnNewClient; + //m_scene.EventManager.OnMakeRootAgent += EventManager_OnMakeRootAgent; + //m_scene.EventManager.OnMakeChildAgent += EventManager_OnMakeChildAgent; + //m_scene.EventManager.OnClientClosed += new EventManager.ClientClosed(RemoveLocalClient); + } + + /// + /// This function will enqueue a message for each SyncConnector in the connector's outgoing queue. + /// Each SyncConnector has a SendLoop thread to send the messages in its outgoing queue. + /// + /// + /// + private void SendSyncMessage(SymmetricSyncMessage.MsgType msgType, string data) + { + //See RegionSyncClientView for initial implementation by Dan Lake + + SymmetricSyncMessage msg = new SymmetricSyncMessage(msgType, data); + ForEachSyncConnector(delegate(SyncConnector syncConnector) + { + syncConnector.Send(msg); + }); + } + + private void SendSyncMessage(SymmetricSyncMessage.MsgType msgType) + { + //See RegionSyncClientView for initial implementation by Dan Lake + + SendSyncMessage(msgType, ""); + } + + public void ForEachSyncConnector(Action action) + { + List closed = null; + foreach (SyncConnector syncConnector in m_syncConnectors) + { + // If connected, send the message. + if (syncConnector.Connected) + { + action(syncConnector); + } + // Else, remove the SyncConnector from the list + else + { + if (closed == null) + closed = new List(); + closed.Add(syncConnector); + } + } + + if (closed != null) + { + foreach (SyncConnector connector in closed) + { + RemoveSyncConnector(connector); + } + } + } + + /// + /// The handler for processing incoming sync messages. + /// + /// + public void HandleIncomingMessage(SymmetricSyncMessage msg) + { + switch (msg.Type) + { + case SymmetricSyncMessage.MsgType.GetTerrain: + { + SendSyncMessage(SymmetricSyncMessage.MsgType.Terrain, m_scene.Heightmap.SaveToXmlString()); + return; + } + case SymmetricSyncMessage.MsgType.Terrain: + { + m_scene.Heightmap.LoadFromXmlString(Encoding.ASCII.GetString(msg.Data, 0, msg.Length)); + m_log.Debug(LogHeader+": Synchronized terrain"); + return; + } + case SymmetricSyncMessage.MsgType.GetObjects: + { + EntityBase[] entities = m_scene.GetEntities(); + foreach (EntityBase e in entities) + { + if (e is SceneObjectGroup) + { + string sogxml = SceneObjectSerializer.ToXml2Format((SceneObjectGroup)e); + SendSyncMessage(SymmetricSyncMessage.MsgType.NewObject, sogxml); + } + } + return; + } + case SymmetricSyncMessage.MsgType.NewObject: + { + SceneObjectGroup sog = SceneObjectSerializer.FromXml2Format(Encoding.ASCII.GetString(msg.Data, 0, msg.Length)); + + //HandleAddOrUpdateObjectInLocalScene(sog, true, true); + HandleAddNewObject(sog); + } + return; + default: + return; + } + } + + private void HandleAddNewObject(SceneObjectGroup sog) + { + if (m_scene.AddNewSceneObject(sog, true)){ + + } + } + + + #endregion //RegionSyncModule members and functions } @@ -474,7 +604,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule m_isListening = false; } - // Listen for connections from a new RegionSyncClient + // Listen for connections from a new SyncConnector // When connected, start the ReceiveLoop for the new client private void Listen() { @@ -490,8 +620,8 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule m_log.WarnFormat("[REGION SYNC SERVER] Listening for new connections on {0}:{1}...", m_listenerInfo.Addr.ToString(), m_listenerInfo.Port.ToString()); TcpClient tcpclient = m_listener.AcceptTcpClient(); - //pass the tcpclient information to RegionSyncModule, who will then create a SyncConnector - m_regionSyncModule.AddSyncConnector(tcpclient); + //Create a SynConnector and starts it communication threads + m_regionSyncModule.AddNewSyncConnector(tcpclient); } } catch (SocketException e) diff --git a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/SyncConnector.cs b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/SyncConnector.cs index 661b7d7ca3..935d7db7b8 100755 --- a/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/SyncConnector.cs +++ b/OpenSim/Region/CoreModules/RegionSync/RegionSyncModule/SymmetricSync/SyncConnector.cs @@ -3,44 +3,97 @@ */ using System; +using System.IO; using System.Collections.Generic; using System.Net.Sockets; using System.Threading; +using System.Text; using log4net; +using OpenMetaverse; namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule { + // For implementations, a lot was copied from RegionSyncClientView, especially the SendLoop/ReceiveLoop. public class SyncConnector { private TcpClient m_tcpConnection = null; private RegionSyncListenerInfo m_remoteListenerInfo = null; private Thread m_rcvLoop; + private Thread m_send_loop; + private string LogHeader = "[SYNC CONNECTOR]"; // The logfile private ILog m_log; + //members for in/out messages queueing + object stats = new object(); + private long queuedUpdates=0; + private long dequeuedUpdates=0; + private long msgsIn=0; + private long msgsOut=0; + private long bytesIn=0; + private long bytesOut=0; + private int msgCount = 0; + // A queue for outgoing traffic. + private BlockingUpdateQueue m_outQ = new BlockingUpdateQueue(); + + private RegionSyncModule m_regionSyncModule = null; + private int m_connectorNum; public int ConnectorNum { get { return m_connectorNum; } } - public SyncConnector(int connectorNum, TcpClient tcpclient) + //The region name of the other side of the connection + private string m_syncOtherSideRegionName=""; + public string OtherSideRegionName + { + get { return m_syncOtherSideRegionName; } + } + + // Check if the client is connected + public bool Connected + { get { return (m_tcpConnection !=null && m_tcpConnection.Connected); } } + + public string Description + { + get + { + if (m_syncOtherSideRegionName == null) + return String.Format("SyncConnector #{0}", m_connectorNum); + return String.Format("SyncConnector #{0} ({1:10})", m_connectorNum, m_syncOtherSideRegionName); + } + } + + /// + /// The constructor that will be called when a SyncConnector is created passively: a remote SyncConnector has initiated the connection. + /// + /// + /// + public SyncConnector(int connectorNum, TcpClient tcpclient, RegionSyncModule syncModule) { m_tcpConnection = tcpclient; m_connectorNum = connectorNum; + m_regionSyncModule = syncModule; m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); } - public SyncConnector(int connectorNum, RegionSyncListenerInfo listenerInfo) + /// + /// The constructor that will be called when a SyncConnector is created actively: it is created to send connection request to a remote listener + /// + /// + /// + public SyncConnector(int connectorNum, RegionSyncListenerInfo listenerInfo, RegionSyncModule syncModule) { m_remoteListenerInfo = listenerInfo; m_connectorNum = connectorNum; + m_regionSyncModule = syncModule; m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); } - //Start the connection - public bool Start() + //Connect to the remote listener + public bool Connect() { m_tcpConnection = new TcpClient(); try @@ -53,16 +106,28 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule m_log.Warn(e.Message); return false; } - - m_rcvLoop = new Thread(new ThreadStart(ReceiveLoop)); - m_rcvLoop.Name = "SyncConnector ReceiveLoop"; - m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_rcvLoop.Name); - m_rcvLoop.Start(); - return true; } - public void Stop() + /// + /// Start both the send and receive threads + /// + public void StartCommThreads() + { + // Create a thread for the receive loop + m_rcvLoop = new Thread(new ThreadStart(ReceiveLoop)); + m_rcvLoop.Name = Description + " (ReceiveLoop)"; + m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_rcvLoop.Name); + m_rcvLoop.Start(); + + // Create a thread for the send loop + m_send_loop = new Thread(new ThreadStart(delegate() { SendLoop(); })); + m_send_loop.Name = Description + " (SendLoop)"; + m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_send_loop.Name); + m_send_loop.Start(); + } + + public void Shutdown() { // The remote scene will remove our avatars automatically when we disconnect //m_rcvLoop.Abort(); @@ -72,24 +137,102 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule m_tcpConnection.Close(); } - // *** This is the main thread loop for each sync connection + /////////////////////////////////////////////////////////// + // Sending messages out to the other side of the connection + /////////////////////////////////////////////////////////// + // Send messages from the update Q as fast as we can DeQueue them + // *** This is the main send loop thread for each connected client + private void SendLoop() + { + try + { + while (true) + { + // Dequeue is thread safe + byte[] update = m_outQ.Dequeue(); + lock (stats) + dequeuedUpdates++; + Send(update); + } + } + catch (Exception e) + { + m_log.ErrorFormat("{0} has disconnected: {1} (SendLoop)", LogHeader, e.Message); + } + Shutdown(); + } + + /// + /// Enqueue update of an object/avatar into the outgoing queue, and return right away + /// + /// UUID of the object/avatar + /// the update infomation in byte format + public void EnqueueOutgoingUpdate(UUID id, byte[] update) + { + lock (stats) + queuedUpdates++; + // Enqueue is thread safe + m_outQ.Enqueue(id, update); + } + + //Send out a messge directly. This should only by called for short messages that are not sent frequently. + //Don't call this function for sending out updates. Call EnqueueOutgoingUpdate instead + public void Send(SymmetricSyncMessage msg) + { + Send(msg.ToBytes()); + } + + private void Send(byte[] data) + { + if (m_tcpConnection.Connected) + { + try + { + lock (stats) + { + msgsOut++; + bytesOut += data.Length; + } + m_tcpConnection.GetStream().BeginWrite(data, 0, data.Length, ar => + { + if (m_tcpConnection.Connected) + { + try + { + m_tcpConnection.GetStream().EndWrite(ar); + } + catch (Exception) + { } + } + }, null); + } + catch (IOException) + { + m_log.WarnFormat("{0}:{1} has disconnected.", LogHeader, m_connectorNum); + } + } + } + + /////////////////////////////////////////////////////////// + // Receiving messages from the other side ofthe connection + /////////////////////////////////////////////////////////// private void ReceiveLoop() { m_log.WarnFormat("{0} Thread running: {1}", LogHeader, m_rcvLoop.Name); while (true && m_tcpConnection.Connected) { - RegionSyncMessage msg; + SymmetricSyncMessage msg; // Try to get the message from the network stream try { - msg = new RegionSyncMessage(m_tcpConnection.GetStream()); + msg = new SymmetricSyncMessage(m_tcpConnection.GetStream()); //m_log.WarnFormat("{0} Received: {1}", LogHeader, msg.ToString()); } // If there is a problem reading from the client, shut 'er down. catch { //ShutdownClient(); - Stop(); + Shutdown(); return; } // Try handling the message @@ -104,9 +247,30 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule } } - private void HandleMessage(RegionSyncMessage msg) + private void HandleMessage(SymmetricSyncMessage msg) { + msgCount++; + switch (msg.Type) + { + case SymmetricSyncMessage.MsgType.RegionName: + { + m_syncOtherSideRegionName = Encoding.ASCII.GetString(msg.Data, 0, msg.Length); + if (m_regionSyncModule.IsSyncRelay) + { + SymmetricSyncMessage outMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.RegionName, m_regionSyncModule.LocalScene.RegionInfo.RegionName); + Send(outMsg); + } + m_log.DebugFormat("Syncing to region \"{0}\"", m_syncOtherSideRegionName); + return; + } + default: + break; + } + + //For any other messages, we simply deliver the message to RegionSyncModule for now. + //Later on, we may deliver messages to different modules, say sync message to RegionSyncModule and event message to ActorSyncModule. + m_regionSyncModule.HandleIncomingMessage(msg); } } } \ No newline at end of file