Reimplement logic in RegionSyncModule so that a SyncRelay node (e.g. PSA) only sends out updates for a bucket

that has properties updated in the last update interval: the previous implemenation has some flaws in design.
dsg
Huaiyu (Kitty) Liu 2011-03-15 11:21:46 -07:00
parent 4803745c0d
commit 3d17bd5654
4 changed files with 127 additions and 46 deletions

View File

@ -199,11 +199,11 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
foreach (string bucketName in m_propertyBucketNames)
{
//if (m_isSyncRelay || part.HasPropertyUpdatedLocallyInGivenBucket(bucketName))
if(ToSendoutUpdate(part, bucketName))
if(HaveUpdatesToSendoutForSync(part, bucketName))
{
lock (m_primUpdateLocks[bucketName])
{
m_log.Debug("Queueing to bucket " + bucketName + " with part " + part.Name + ", " + part.UUID);
//m_log.Debug("Queueing to bucket " + bucketName + " with part " + part.Name + ", " + part.UUID+" at pos "+part.GroupPosition.ToString());
m_primUpdates[bucketName][part.UUID] = part;
}
}
@ -387,7 +387,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
//no SyncConnector connected. Do nothing.
return;
}
m_log.DebugFormat(LogHeader + "SendNewObject called for object {0}, {1}", sog.Name, sog.UUID);
//m_log.DebugFormat(LogHeader + "SendNewObject called for object {0}, {1}", sog.Name, sog.UUID);
string sogxml = SceneObjectSerializer.ToXml2Format(sog);
SymmetricSyncMessage rsm = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.NewObject, sogxml);
@ -409,7 +409,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
return;
}
m_log.DebugFormat(LogHeader+"SendDeleteObject called for object {0}", sog.UUID);
//m_log.DebugFormat(LogHeader+"SendDeleteObject called for object {0}", sog.UUID);
//Only send the message out if this is a relay node for sync messages, or this actor caused deleting the object
//if (m_isSyncRelay || CheckObjectForSendingUpdate(sog))
@ -737,27 +737,31 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
//Going forward, we may serialize the properties differently, e.g. using OSDMap
private void PrimUpdatesGeneralBucketSender(string bucketName, List<SceneObjectPart> primUpdates)
{
UpdateBucektLastSentTime(bucketName);
Dictionary<UUID, SceneObjectGroup> updatedObjects = new Dictionary<UUID, SceneObjectGroup>();
foreach (SceneObjectPart part in primUpdates)
{
updatedObjects[part.ParentGroup.UUID] = part.ParentGroup;
}
long timeStamp = DateTime.Now.Ticks;
foreach (SceneObjectGroup sog in updatedObjects.Values)
{
sog.UpdateTaintedBucketSyncInfo(bucketName, DateTime.Now.Ticks); //this update the timestamp and clear the taint info of the bucket
sog.UpdateTaintedBucketSyncInfo(bucketName, timeStamp); //this update the timestamp and clear the taint info of the bucket
string sogxml = SceneObjectSerializer.ToXml2Format(sog);
SymmetricSyncMessage syncMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.UpdatedObject, sogxml);
SendObjectUpdateToRelevantSyncConnectors(sog, syncMsg);
//clear the taints
foreach (SceneObjectPart part in sog.Parts)
{
part.BucketSyncInfoList[bucketName].ClearBucketTaintBySync();
}
}
//UpdateBucektLastSentTime(bucketName, timeStamp);
}
private void PrimUpdatesPhysicsBucketSender(string bucketName, List<SceneObjectPart> primUpdates)
{
UpdateBucektLastSentTime(bucketName);
foreach (SceneObjectPart updatedPart in primUpdates)
{
updatedPart.UpdateTaintedBucketSyncInfo(bucketName, DateTime.Now.Ticks);
@ -792,38 +796,48 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
data["Buoyancy"] = OSD.FromReal(pa.Buoyancy);
data["CollidingGround"] = OSD.FromBoolean(pa.CollidingGround);
data["IsColliding"] = OSD.FromBoolean(pa.IsColliding);
m_log.DebugFormat("{0}: PhysBucketSender for {1}, pos={2}", LogHeader, updatedPart.UUID.ToString(), pa.Position.ToString());
}
data["LastUpdateTimeStamp"] = OSD.FromLong(updatedPart.BucketSyncInfoList[bucketName].LastUpdateTimeStamp);
data["LastUpdateActorID"] = OSD.FromString(updatedPart.BucketSyncInfoList[bucketName].LastUpdateActorID);
//m_log.Debug(LogHeader + " Send out Physics Bucket updates for " + updatedPart.Name + ". GroupPosition: " + updatedPart.GroupPosition.ToString());
//m_log.Debug(LogHeader + " Send out Physics Bucket updates for " + updatedPart.Name + ","+updatedPart.UUID+ ". GroupPosition: " + updatedPart.GroupPosition.ToString());
SymmetricSyncMessage syncMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.UpdatedBucketProperties, OSDParser.SerializeJsonString(data));
//m_log.DebugFormat("{0}: PhysBucketSender for {1}, pos={2}", LogHeader, updatedPart.UUID.ToString(), pa.Position.ToString());
SendObjectUpdateToRelevantSyncConnectors(updatedPart, syncMsg);
//clear the taints
updatedPart.BucketSyncInfoList[bucketName].ClearBucketTaintBySync();
}
//UpdateBucektLastSentTime(bucketName);
}
/*
private void UpdateBucektLastSentTime(string bucketName)
{
long timeStamp = DateTime.Now.Ticks;
/*
if (m_lastUpdateSentTime.ContainsKey(bucketName))
{
m_lastUpdateSentTime[bucketName] = timeStamp;
}
*/
private bool HaveUpdatesToSendoutForSync(SceneObjectPart part, string bucketName)
{
if (m_isSyncRelay)
{
return (part.HasPropertyUpdatedLocally(bucketName) || part.HasPropertyUpdatedBySync(bucketName));
}
else
{
m_lastUpdateSentTime.Add(bucketName, timeStamp);
}
* */
m_lastUpdateSentTime[bucketName] = timeStamp;
return part.HasPropertyUpdatedLocally(bucketName);
}
private bool ToSendoutUpdate(SceneObjectPart part, string bucketName)
{
//return (m_isSyncRelay || part.HasPropertyUpdatedLocallyInGivenBucket(bucketName));
/*
if (!m_isSyncRelay)
{
return part.HasPropertyUpdatedLocallyInGivenBucket(bucketName);
@ -834,6 +848,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
return true;
else
return false;
* */
}
//If nothing configured in the config file, this is the default settings for grouping properties into different bucket
@ -909,6 +924,8 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
//string sogxml = SceneObjectSerializer.ToXml2Format(sog);
//SymmetricSyncMessage syncMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.UpdatedObject, sogxml);
//m_log.Debug("Send " + syncMsg.Type.ToString() + " about sog "+sog.Name+","+sog.UUID+ " at pos "+sog.AbsolutePosition.ToString()+" to " + connector.OtherSideActorID);
connector.EnqueueOutgoingUpdate(sog.UUID, syncMsg.ToBytes());
}
}
@ -922,6 +939,10 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
//string sogxml = SceneObjectSerializer.ToXml2Format(sog);
//SymmetricSyncMessage syncMsg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.UpdatedObject, sogxml);
//m_log.Debug("Send " + syncMsg.Type.ToString() + " about sop " + updatedPart.Name + "," + updatedPart.UUID + " at pos "+updatedPart.GroupPosition.ToString()
//+" to " + connector.OtherSideActorID);
connector.EnqueueOutgoingUpdate(updatedPart.UUID, syncMsg.ToBytes());
}
}
@ -1272,6 +1293,20 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
}
m_log.WarnFormat("SyncStateReport -- Object count: {0}, Prim Count {1} ", sogList.Count, primCount);
foreach (SceneObjectGroup sog in sogList)
{
m_log.WarnFormat("SyncStateReport -- SOG: name {0}, UUID {1}, position {2}", sog.Name, sog.UUID, sog.AbsolutePosition);
}
if (m_isSyncRelay)
{
SymmetricSyncMessage msg = new SymmetricSyncMessage(SymmetricSyncMessage.MsgType.SyncStateReport);
ForEachSyncConnector(delegate(SyncConnector connector)
{
connector.Send(msg);
});
}
}
private void SyncDebug(Object[] args)
@ -1560,6 +1595,11 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
HandleRemoteEvent(msg, senderActorID);
return;
}
case SymmetricSyncMessage.MsgType.SyncStateReport:
{
SyncStateReport(null);
return;
}
default:
return;
}
@ -1591,9 +1631,16 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
string sogxml = Encoding.ASCII.GetString(msg.Data, 0, msg.Length);
SceneObjectGroup sog = SceneObjectSerializer.FromXml2Format(sogxml);
m_log.DebugFormat("{0}: received NewObject sync message from {1}, for object {1}, {2}", LogHeader, senderActorID, sog.Name, sog.UUID);
//m_log.DebugFormat("{0}: received NewObject sync message from {1}, for object {1}, {2}", LogHeader, senderActorID, sog.Name, sog.UUID);
Scene.ObjectUpdateResult updateResult = m_scene.AddOrUpdateObjectBySynchronization(sog);
//if this is a relay node, forwards the event
if (m_isSyncRelay)
{
//SendSceneEventToRelevantSyncConnectors(init_actorID, msg);
SendSceneEventToRelevantSyncConnectors(senderActorID, msg);
}
}
private void HandleAddOrUpdateObjectBySynchronization(SymmetricSyncMessage msg, string senderActorID)
@ -1608,7 +1655,8 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
{
partnames += "(" + part.Name + ", " + part.UUID + ")";
}
m_log.Debug(LogHeader+" received "+msg.Type.ToString()+" from "+senderActorID+" about obj "+sog.Name+", "+sog.UUID+"; parts -- "+partnames);
m_log.Debug(LogHeader+" received "+msg.Type.ToString()+" from "+senderActorID+" about obj "+sog.Name+", "+sog.UUID);//+"; parts -- "+partnames);
* */
if (sog.IsDeleted)
@ -1653,7 +1701,7 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
UUID partUUID = data["UUID"].AsUUID();
string bucketName = data["Bucket"].AsString();
//m_log.DebugFormat("{0}: HandleUpdatedBucketProperties {1}: for {2}/{3}", LogHeader, senderActorID, partUUID.ToString(), bucketName);
//m_log.DebugFormat("{0}: HandleUpdatedBucketProperties from {1}: for {2}/{3}", LogHeader, senderActorID, partUUID.ToString(), bucketName);
/* Commented out since OSDMap is now passed all the way through to the unpacker.
* Previous implementation is to create a SOP and copy the values into same and copy them out later.

View File

@ -47,6 +47,8 @@ namespace OpenSim.Region.CoreModules.RegionSync.RegionSyncModule
ObjectGrabbing,
ObjectDeGrab,
Attach,
//contorl command
SyncStateReport,
}
#endregion

View File

@ -2026,7 +2026,7 @@ namespace OpenSim.Region.Framework.Scenes
}
else
{
m_log.Debug(updatedSog.Name+" "+updatedSog.UUID+" not found in Entities list. Need to add");
//m_log.Debug(updatedSog.Name+" "+updatedSog.UUID+" not found in Entities list. Need to add");
AddSceneObjectByStateSynch(updatedSog);
updateResult = Scene.ObjectUpdateResult.New;
}
@ -2070,8 +2070,6 @@ namespace OpenSim.Region.Framework.Scenes
}
m_numPrim += children.Length;
m_log.Debug("Attached obj " + sceneObject.Name + "," + sceneObject.UUID + " to Scene");
sceneObject.AttachToScene(m_parentScene);
//Take some special care of the case of this object being an attachment

View File

@ -2795,7 +2795,7 @@ namespace OpenSim.Region.Framework.Scenes
m_parentGroup.AbsolutePosition = newpos;
return;
}
m_log.DebugFormat("[PHYSICS]: TerseUpdate: UUID={0}, newpos={1}", PhysActor.UUID.ToString(), newpos.ToString());
//m_log.DebugFormat("[PHYSICS]: TerseUpdate: UUID={0}, newpos={1}", PhysActor.UUID.ToString(), newpos.ToString());
//m_parentGroup.RootPart.m_groupPosition = newpos;
}
//ScheduleTerseUpdate();
@ -5003,7 +5003,8 @@ namespace OpenSim.Region.Framework.Scenes
//lock for concurrent updates of the timestamp and actorID.
private Object m_updateLock = new Object();
private string m_bucketName;
private bool m_bucketTainted = false;
private bool m_bucketLocallyTainted = false; //indicating if the bucket has been tainted by local write operations
private bool m_bucketTaintedBySync = false; //indicating if the bucket has been tainted by remote write operations (propogated by synchronization)
public long LastUpdateTimeStamp
{
@ -5022,9 +5023,14 @@ namespace OpenSim.Region.Framework.Scenes
get { return m_bucketName; }
}
public bool Tainted
public bool LocallyTainted
{
get { return m_bucketTainted; }
get { return m_bucketLocallyTainted; }
}
public bool TaintedBySync
{
get { return m_bucketTaintedBySync; }
}
public BucketSyncInfo(string bucketName)
@ -5045,17 +5051,27 @@ namespace OpenSim.Region.Framework.Scenes
{
m_lastUpdateTimeStamp = timeStamp;
m_lastUpdateActorID = actorID;
m_bucketTainted = false; //clear taint
m_bucketLocallyTainted = false; //clear taint
}
}
public void TaintBucket()
public void TaintBucketLocally()
{
lock (m_updateLock)
{
m_bucketTainted = true;
m_bucketLocallyTainted = true;
}
}
public void TaintBucketBySync()
{
m_bucketTaintedBySync = true;
}
public void ClearBucketTaintBySync()
{
m_bucketTaintedBySync = false;
}
}
/*
@ -5497,6 +5513,8 @@ namespace OpenSim.Region.Framework.Scenes
// are kept the same as in the received copy of the object.
ScheduleFullUpdate(null);
//Mark the bucket as having been tainted by sync operations
m_bucketSyncInfoList[bucketName].TaintBucketBySync();
}
@ -5542,6 +5560,9 @@ namespace OpenSim.Region.Framework.Scenes
localPart.AngularVelocity = data["AngularVelocity"].AsVector3();
localPart.RotationOffset = data["RotationOffset"].AsQuaternion();
//m_log.Debug("Received Physics Bucket updates for " + localPart.Name + ","+localPart.UUID
// + ". GroupPosition: " + data["GroupPosition"].AsVector3().ToString());
if (pa != null)
{
pa.Size = data["Size"].AsVector3();
@ -5558,6 +5579,8 @@ namespace OpenSim.Region.Framework.Scenes
pa.Buoyancy = (float)data["Buoyancy"].AsReal();
pa.CollidingGround = data["CollidingGround"].AsBoolean();
pa.IsColliding = data["IsColliding"].AsBoolean();
// m_log.DebugFormat("{0}: PhysicsBucketUpdateProcessor for {2},{3}. pos={1}", , data["Position"].AsVector3().ToString(), localPart.Name, localPart.UUID);
}
m_bucketSyncInfoList[bucketName].LastUpdateTimeStamp = data["LastUpdateTimeStamp"].AsLong();
@ -5574,6 +5597,8 @@ namespace OpenSim.Region.Framework.Scenes
ScheduleFullUpdate(null);
//Mark the bucket as having been tainted by sync operations
m_bucketSyncInfoList[bucketName].TaintBucketBySync();
}
//Initialize and set the values of timestamp and actorID for each synchronization bucket.
@ -5628,27 +5653,31 @@ namespace OpenSim.Region.Framework.Scenes
{
foreach (BucketSyncInfo bucketSynInfo in m_bucketSyncInfoList.Values)
{
bucketSynInfo.TaintBucket();
bucketSynInfo.TaintBucketLocally();
}
}
else
{
string bucketName = m_primPropertyBucketMap[property];
//m_bucketSyncTainted[bucketName] = true;
m_bucketSyncInfoList[bucketName].TaintBucket();
m_bucketSyncInfoList[bucketName].TaintBucketLocally();
m_log.Debug(this.Name + ": " + property.ToString() + " just changed. Tainted " + bucketName);
// m_log.Debug(this.Name + ": " + property.ToString() + " just changed. Tainted " + bucketName);
}
}
}
public bool HasPropertyUpdatedLocallyInGivenBucket(string bucketName)
public bool HasPropertyUpdatedLocally(string bucketName)
{
return m_bucketSyncInfoList[bucketName].Tainted;
return m_bucketSyncInfoList[bucketName].LocallyTainted;
}
public bool HasPropertyUpdatedBySync(string bucketName)
{
return m_bucketSyncInfoList[bucketName].TaintedBySync;
}
/// <summary>
/// Update the timestamp information of each property bucket, and clear out the taint on each bucket.
@ -5661,7 +5690,7 @@ namespace OpenSim.Region.Framework.Scenes
foreach (KeyValuePair<string, BucketSyncInfo> pair in m_bucketSyncInfoList)
{
string bucketName = pair.Key;
if (m_bucketSyncInfoList[bucketName].Tainted)
if (m_bucketSyncInfoList[bucketName].LocallyTainted)
{
m_bucketSyncInfoList[bucketName].UpdateSyncInfoAndClearTaint(timeStamp, m_localActorID);
}
@ -5682,7 +5711,7 @@ namespace OpenSim.Region.Framework.Scenes
foreach (KeyValuePair<string, BucketSyncInfo> pair in m_bucketSyncInfoList)
{
string bucketName = pair.Key;
if (m_bucketSyncInfoList[bucketName].Tainted)
if (m_bucketSyncInfoList[bucketName].LocallyTainted)
{
m_bucketSyncInfoList[bucketName].UpdateSyncInfoAndClearTaint(timeStamp, m_localActorID);
}
@ -5694,7 +5723,7 @@ namespace OpenSim.Region.Framework.Scenes
{
if (m_syncEnabled)
{
if (m_bucketSyncInfoList[bucketName].Tainted)
if (m_bucketSyncInfoList[bucketName].LocallyTainted)
{
m_bucketSyncInfoList[bucketName].UpdateSyncInfoAndClearTaint(timeStamp, m_localActorID);
}
@ -5769,8 +5798,10 @@ namespace OpenSim.Region.Framework.Scenes
{
if (!m_bucketSyncInfoList[bucketName].LastUpdateActorID.Equals(updatedPart.BucketSyncInfoList[bucketName].LastUpdateActorID))
{
m_log.Warn("Different actors modified SceneObjetPart " + UUID + " with the same TimeStamp (" + m_bucketSyncInfoList[bucketName].LastUpdateActorID
+ "," + updatedPart.BucketSyncInfoList[bucketName].LastUpdateActorID + ", CONFLICT RESOLUTION TO BE IMPLEMENTED, PICK A WINNER!!!!");
m_log.Warn("Different actors modified SceneObjetPart " + Name+"," +UUID + ", bucket "+bucketName+", with the same TimeStamp ("
+ m_bucketSyncInfoList[bucketName].LastUpdateActorID
+ "," + updatedPart.BucketSyncInfoList[bucketName].LastUpdateActorID +
", CONFLICT RESOLUTION TO BE IMPLEMENTED, PICK A WINNER!!!!");
}
//TODO: conflict resolution to be implemented -- pick a winner
continue;
@ -5823,8 +5854,10 @@ namespace OpenSim.Region.Framework.Scenes
{
if (!m_bucketSyncInfoList[bucketName].LastUpdateActorID.Equals(rBucketSyncInfo.LastUpdateActorID))
{
m_log.Warn("Different actors modified SceneObjetPart " + UUID + " with the same TimeStamp (" + m_bucketSyncInfoList[bucketName].LastUpdateActorID
+ "," + rBucketSyncInfo.LastUpdateActorID + ", CONFLICT RESOLUTION TO BE IMPLEMENTED, PICK A WINNER!!!!");
m_log.Warn("Different actors modified SceneObjetPart " + Name + "," + UUID + ", bucket " + bucketName + ", with the same TimeStamp ("
+ m_bucketSyncInfoList[bucketName].LastUpdateActorID
+ "," + rBucketSyncInfo.LastUpdateActorID +
", CONFLICT RESOLUTION TO BE IMPLEMENTED, PICK A WINNER!!!!");
}
//TODO: conflict resolution to be implemented -- pick a winner
return partUpdateResult;