Two SyncConnectors now exchange RegionName and Terrian (Scene and Scriptengine tested).

dsg
Huaiyu (Kitty) Liu 2010-12-22 16:56:34 -08:00
parent f97fe18648
commit dc6964444e
2 changed files with 322 additions and 28 deletions

View File

@ -12,10 +12,12 @@ using OpenSim.Framework.Client;
using OpenSim.Region.CoreModules.Framework.InterfaceCommander;
using OpenSim.Region.Framework.Interfaces;
using OpenSim.Region.Framework.Scenes;
using OpenSim.Region.Framework.Scenes.Serialization;
using log4net;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Text;
using Mono.Addins;
@ -240,6 +242,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
}
private IConfig m_sysConfig = null;
private string LogHeader = "[REGION SYNC MODULE]";
//The list of SyncConnectors. ScenePersistence could have multiple SyncConnectors, each connecting to a differerent actor.
//An actor could have several SyncConnectors as well, each connecting to a ScenePersistence that hosts a portion of the objects/avatars
@ -313,6 +316,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
GetRemoteSyncListenerInfo();
StartSyncConnections();
DoInitialSync();
}
}
@ -344,21 +348,21 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
foreach (RegionSyncListenerInfo remoteListener in m_remoteSyncListeners)
{
SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, remoteListener);
if (syncConnector.Start())
SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, remoteListener, this);
if (syncConnector.Connect())
{
syncConnector.StartCommThreads();
AddSyncConnector(syncConnector);
}
}
}
public void AddSyncConnector(TcpClient tcpclient)
//To be called when a SyncConnector needs to be created by that the local listener receives a connection request
public void AddNewSyncConnector(TcpClient tcpclient)
{
//IPAddress addr = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Address;
//int port = ((IPEndPoint)tcpclient.Client.RemoteEndPoint).Port;
SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, tcpclient);
//Create a SynConnector due to an incoming request, and starts its communication threads
SyncConnector syncConnector = new SyncConnector(m_syncConnectorNum++, tcpclient, this);
syncConnector.StartCommThreads();
AddSyncConnector(syncConnector);
}
@ -402,13 +406,139 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
foreach (SyncConnector syncConnector in m_syncConnectors)
{
syncConnector.Stop();
syncConnector.Shutdown();
}
m_syncConnectors.Clear();
}
}
private void DoInitialSync()
{
m_scene.DeleteAllSceneObjects();
SendSyncMessage(SymmetricSyncMessage.MsgType.RegionName, m_scene.RegionInfo.RegionName);
m_log.WarnFormat("Sending region name: \"{0}\"", m_scene.RegionInfo.RegionName);
SendSyncMessage(SymmetricSyncMessage.MsgType.GetTerrain);
//Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetObjects));
//Send(new RegionSyncMessage(RegionSyncMessage.MsgType.GetAvatars));
//We'll deal with Event a bit later
// Register for events which will be forwarded to authoritative scene
// m_scene.EventManager.OnNewClient += EventManager_OnNewClient;
//m_scene.EventManager.OnMakeRootAgent += EventManager_OnMakeRootAgent;
//m_scene.EventManager.OnMakeChildAgent += EventManager_OnMakeChildAgent;
//m_scene.EventManager.OnClientClosed += new EventManager.ClientClosed(RemoveLocalClient);
}
/// <summary>
/// This function will enqueue a message for each SyncConnector in the connector's outgoing queue.
/// Each SyncConnector has a SendLoop thread to send the messages in its outgoing queue.
/// </summary>
/// <param name="msgType"></param>
/// <param name="data"></param>
private void SendSyncMessage(SymmetricSyncMessage.MsgType msgType, string data)
{
//See RegionSyncClientView for initial implementation by Dan Lake
SymmetricSyncMessage msg = new SymmetricSyncMessage(msgType, data);
ForEachSyncConnector(delegate(SyncConnector syncConnector)
{
syncConnector.Send(msg);
});
}
private void SendSyncMessage(SymmetricSyncMessage.MsgType msgType)
{
//See RegionSyncClientView for initial implementation by Dan Lake
SendSyncMessage(msgType, "");
}
public void ForEachSyncConnector(Action<SyncConnector> action)
{
List<SyncConnector> closed = null;
foreach (SyncConnector syncConnector in m_syncConnectors)
{
// If connected, send the message.
if (syncConnector.Connected)
{
action(syncConnector);
}
// Else, remove the SyncConnector from the list
else
{
if (closed == null)
closed = new List<SyncConnector>();
closed.Add(syncConnector);
}
}
if (closed != null)
{
foreach (SyncConnector connector in closed)
{
RemoveSyncConnector(connector);
}
}
}
/// <summary>
/// The handler for processing incoming sync messages.
/// </summary>
/// <param name="msg"></param>
public void HandleIncomingMessage(SymmetricSyncMessage msg)
{
switch (msg.Type)
{
case SymmetricSyncMessage.MsgType.GetTerrain:
{
SendSyncMessage(SymmetricSyncMessage.MsgType.Terrain, m_scene.Heightmap.SaveToXmlString());
return;
}
case SymmetricSyncMessage.MsgType.Terrain:
{
m_scene.Heightmap.LoadFromXmlString(Encoding.ASCII.GetString(msg.Data, 0, msg.Length));
m_log.Debug(LogHeader+": Synchronized terrain");
return;
}
case SymmetricSyncMessage.MsgType.GetObjects:
{
EntityBase[] entities = m_scene.GetEntities();
foreach (EntityBase e in entities)
{
if (e is SceneObjectGroup)
{
string sogxml = SceneObjectSerializer.ToXml2Format((SceneObjectGroup)e);
SendSyncMessage(SymmetricSyncMessage.MsgType.NewObject, sogxml);
}
}
return;
}
case SymmetricSyncMessage.MsgType.NewObject:
{
SceneObjectGroup sog = SceneObjectSerializer.FromXml2Format(Encoding.ASCII.GetString(msg.Data, 0, msg.Length));
//HandleAddOrUpdateObjectInLocalScene(sog, true, true);
HandleAddNewObject(sog);
}
return;
default:
return;
}
}
private void HandleAddNewObject(SceneObjectGroup sog)
{
if (m_scene.AddNewSceneObject(sog, true)){
}
}
#endregion //RegionSyncModule members and functions
}
@ -474,7 +604,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
m_isListening = false;
}
// Listen for connections from a new RegionSyncClient
// Listen for connections from a new SyncConnector
// When connected, start the ReceiveLoop for the new client
private void Listen()
{
@ -490,8 +620,8 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
m_log.WarnFormat("[REGION SYNC SERVER] Listening for new connections on {0}:{1}...", m_listenerInfo.Addr.ToString(), m_listenerInfo.Port.ToString());
TcpClient tcpclient = m_listener.AcceptTcpClient();
//pass the tcpclient information to RegionSyncModule, who will then create a SyncConnector
m_regionSyncModule.AddSyncConnector(tcpclient);
//Create a SynConnector and starts it communication threads
m_regionSyncModule.AddNewSyncConnector(tcpclient);
}
}
catch (SocketException e)

View File

@ -3,44 +3,97 @@
*/
using System;
using System.IO;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Threading;
using System.Text;
using log4net;
using OpenMetaverse;
namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
// For implementations, a lot was copied from RegionSyncClientView, especially the SendLoop/ReceiveLoop.
public class SyncConnector
{
private TcpClient m_tcpConnection = null;
private RegionSyncListenerInfo m_remoteListenerInfo = null;
private Thread m_rcvLoop;
private Thread m_send_loop;
private string LogHeader = "[SYNC CONNECTOR]";
// The logfile
private ILog m_log;
//members for in/out messages queueing
object stats = new object();
private long queuedUpdates=0;
private long dequeuedUpdates=0;
private long msgsIn=0;
private long msgsOut=0;
private long bytesIn=0;
private long bytesOut=0;
private int msgCount = 0;
// A queue for outgoing traffic.
private BlockingUpdateQueue m_outQ = new BlockingUpdateQueue();
private RegionSyncModule m_regionSyncModule = null;
private int m_connectorNum;
public int ConnectorNum
{
get { return m_connectorNum; }
}
public SyncConnector(int connectorNum, TcpClient tcpclient)
//The region name of the other side of the connection
private string m_syncOtherSideRegionName="";
public string OtherSideRegionName
{
get { return m_syncOtherSideRegionName; }
}
// Check if the client is connected
public bool Connected
{ get { return (m_tcpConnection !=null && m_tcpConnection.Connected); } }
public string Description
{
get
{
if (m_syncOtherSideRegionName == null)
return String.Format("SyncConnector #{0}", m_connectorNum);
return String.Format("SyncConnector #{0} ({1:10})", m_connectorNum, m_syncOtherSideRegionName);
}
}
/// <summary>
/// The constructor that will be called when a SyncConnector is created passively: a remote SyncConnector has initiated the connection.
/// </summary>
/// <param name="connectorNum"></param>
/// <param name="tcpclient"></param>
public SyncConnector(int connectorNum, TcpClient tcpclient, RegionSyncModule syncModule)
{
m_tcpConnection = tcpclient;
m_connectorNum = connectorNum;
m_regionSyncModule = syncModule;
m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
}
public SyncConnector(int connectorNum, RegionSyncListenerInfo listenerInfo)
/// <summary>
/// The constructor that will be called when a SyncConnector is created actively: it is created to send connection request to a remote listener
/// </summary>
/// <param name="connectorNum"></param>
/// <param name="listenerInfo"></param>
public SyncConnector(int connectorNum, RegionSyncListenerInfo listenerInfo, RegionSyncModule syncModule)
{
m_remoteListenerInfo = listenerInfo;
m_connectorNum = connectorNum;
m_regionSyncModule = syncModule;
m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
}
//Start the connection
public bool Start()
//Connect to the remote listener
public bool Connect()
{
m_tcpConnection = new TcpClient();
try
@ -53,16 +106,28 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
m_log.Warn(e.Message);
return false;
}
m_rcvLoop = new Thread(new ThreadStart(ReceiveLoop));
m_rcvLoop.Name = "SyncConnector ReceiveLoop";
m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_rcvLoop.Name);
m_rcvLoop.Start();
return true;
}
public void Stop()
/// <summary>
/// Start both the send and receive threads
/// </summary>
public void StartCommThreads()
{
// Create a thread for the receive loop
m_rcvLoop = new Thread(new ThreadStart(ReceiveLoop));
m_rcvLoop.Name = Description + " (ReceiveLoop)";
m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_rcvLoop.Name);
m_rcvLoop.Start();
// Create a thread for the send loop
m_send_loop = new Thread(new ThreadStart(delegate() { SendLoop(); }));
m_send_loop.Name = Description + " (SendLoop)";
m_log.WarnFormat("{0} Starting {1} thread", LogHeader, m_send_loop.Name);
m_send_loop.Start();
}
public void Shutdown()
{
// The remote scene will remove our avatars automatically when we disconnect
//m_rcvLoop.Abort();
@ -72,24 +137,102 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
m_tcpConnection.Close();
}
// *** This is the main thread loop for each sync connection
///////////////////////////////////////////////////////////
// Sending messages out to the other side of the connection
///////////////////////////////////////////////////////////
// Send messages from the update Q as fast as we can DeQueue them
// *** This is the main send loop thread for each connected client
private void SendLoop()
{
try
{
while (true)
{
// Dequeue is thread safe
byte[] update = m_outQ.Dequeue();
lock (stats)
dequeuedUpdates++;
Send(update);
}
}
catch (Exception e)
{
m_log.ErrorFormat("{0} has disconnected: {1} (SendLoop)", LogHeader, e.Message);
}
Shutdown();
}
/// <summary>
/// Enqueue update of an object/avatar into the outgoing queue, and return right away
/// </summary>
/// <param name="id">UUID of the object/avatar</param>
/// <param name="update">the update infomation in byte format</param>
public void EnqueueOutgoingUpdate(UUID id, byte[] update)
{
lock (stats)
queuedUpdates++;
// Enqueue is thread safe
m_outQ.Enqueue(id, update);
}
//Send out a messge directly. This should only by called for short messages that are not sent frequently.
//Don't call this function for sending out updates. Call EnqueueOutgoingUpdate instead
public void Send(SymmetricSyncMessage msg)
{
Send(msg.ToBytes());
}
private void Send(byte[] data)
{
if (m_tcpConnection.Connected)
{
try
{
lock (stats)
{
msgsOut++;
bytesOut += data.Length;
}
m_tcpConnection.GetStream().BeginWrite(data, 0, data.Length, ar =>
{
if (m_tcpConnection.Connected)
{
try
{
m_tcpConnection.GetStream().EndWrite(ar);
}
catch (Exception)
{ }
}
}, null);
}
catch (IOException)
{
m_log.WarnFormat("{0}:{1} has disconnected.", LogHeader, m_connectorNum);
}
}
}
///////////////////////////////////////////////////////////
// Receiving messages from the other side ofthe connection
///////////////////////////////////////////////////////////
private void ReceiveLoop()
{
m_log.WarnFormat("{0} Thread running: {1}", LogHeader, m_rcvLoop.Name);
while (true && m_tcpConnection.Connected)
{
RegionSyncMessage msg;
SymmetricSyncMessage msg;
// Try to get the message from the network stream
try
{
msg = new RegionSyncMessage(m_tcpConnection.GetStream());
msg = new SymmetricSyncMessage(m_tcpConnection.GetStream());
//m_log.WarnFormat("{0} Received: {1}", LogHeader, msg.ToString());
}
// If there is a problem reading from the client, shut 'er down.
catch
{
//ShutdownClient();
Stop();
Shutdown();
return;
}
// Try handling the message
@ -104,9 +247,30 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
}
}
private void HandleMessage(RegionSyncMessage msg)
private void HandleMessage(SymmetricSyncMessage msg)
{
msgCount++;
switch (msg.Type)
{
case SymmetricSyncMessage.MsgType.RegionName:
{
m_syncOtherSideRegionName = Encoding.ASCII.GetString(msg.Data, 0, msg.Length);
if (m_regionSyncModule.IsSyncRelay)
{
SymmetricSyncMessage outMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.RegionName, m_regionSyncModule.LocalScene.RegionInfo.RegionName);
Send(outMsg);
}
m_log.DebugFormat("Syncing to region \"{0}\"", m_syncOtherSideRegionName);
return;
}
default:
break;
}
//For any other messages, we simply deliver the message to RegionSyncModule for now.
//Later on, we may deliver messages to different modules, say sync message to RegionSyncModule and event message to ActorSyncModule.
m_regionSyncModule.HandleIncomingMessage(msg);
}
}
}