From c1b8283ad139b6cbd947b6f6dada6128e99e696d Mon Sep 17 00:00:00 2001 From: MW Date: Mon, 5 Mar 2007 18:10:21 +0000 Subject: [PATCH] Assets and the asset cache should be working when run in grid mode (but needs testing), now just need to implement a local assetserver. --- Assets/AssetCache.cs | 171 +++++++++++++++++++++++++++++++++--- GridServers/IAssetServer.cs | 107 ++++++++++++++++++++-- Main.cs | 7 +- OpenSimClient.cs | 64 +------------- 4 files changed, 263 insertions(+), 86 deletions(-) diff --git a/Assets/AssetCache.cs b/Assets/AssetCache.cs index 47f1842ee3..2874a25b1e 100644 --- a/Assets/AssetCache.cs +++ b/Assets/AssetCache.cs @@ -27,6 +27,7 @@ using System; using System.Collections.Generic; +using System.Threading; using libsecondlife; using libsecondlife.Packets; using OpenSim; @@ -37,7 +38,7 @@ namespace OpenSim.Assets /// /// Manages local cache of assets and their sending to viewers. /// - public class AssetCache : IAssetReceived + public class AssetCache : IAssetReceiver { public Dictionary Assets; public Dictionary Textures; @@ -45,10 +46,11 @@ namespace OpenSim.Assets public List AssetRequests = new List(); //assets ready to be sent to viewers public List TextureRequests = new List(); //textures ready to be sent - public List RequestedAssets = new List(); //Assets requested from the asset server - public List RequestedTextures = new List(); //Textures requested from the asset server + public Dictionary RequestedAssets = new Dictionary(); //Assets requested from the asset server + public Dictionary RequestedTextures = new Dictionary(); //Textures requested from the asset server private IAssetServer _assetServer; + private Thread _assetCacheThread; /// /// @@ -57,6 +59,9 @@ namespace OpenSim.Assets { _assetServer = assetServer; _assetServer.SetReceiver(this); + this._assetCacheThread = new Thread( new ThreadStart(RunAssetManager)); + this._assetCacheThread.IsBackground = true; + this._assetCacheThread.Start(); } /// @@ -64,8 +69,10 @@ namespace OpenSim.Assets /// private void RunAssetManager() { + //should be running in its own thread this.ProcessAssetQueue(); this.ProcessTextureQueue(); + Thread.Sleep(100); } /// @@ -79,7 +86,6 @@ namespace OpenSim.Assets return; } int num; - //should be running in its own thread but for now is called by timer if(this.TextureRequests.Count < 5) { @@ -157,12 +163,36 @@ namespace OpenSim.Assets } } - public void AssetReceived(AssetBase asset) + public void AssetReceived(AssetBase asset, bool IsTexture) { //check if it is a texture or not //then add to the correct cache list //then check for waiting requests for this asset/texture (in the Requested lists) //and move those requests into the Requests list. + if(IsTexture) + { + TextureImage image = new TextureImage(asset); + this.Textures.Add(image.FullID, image); + if(this.RequestedTextures.ContainsKey(image.FullID)) + { + AssetRequest req = this.RequestedTextures[image.FullID]; + req.ImageInfo = image; + this.RequestedTextures.Remove(image.FullID); + this.TextureRequests.Add(req); + } + } + else + { + AssetInfo assetInf = new AssetInfo(asset); + this.Assets.Add(assetInf.FullID, assetInf); + if(this.RequestedAssets.ContainsKey(assetInf.FullID)) + { + AssetRequest req = this.RequestedAssets[assetInf.FullID]; + req.AssetInf = assetInf; + this.RequestedAssets.Remove(assetInf.FullID); + this.AssetRequests.Add(req); + } + } } public void AssetNotFound(AssetBase asset) @@ -187,16 +217,34 @@ namespace OpenSim.Assets // so request from asset server AssetRequest request = new AssetRequest(); request.RequestUser = userInfo; - request.RequestImage = requestID; - this.AssetRequests.Add(request); - this._assetServer.RequestAsset(requestID); + request.RequestAssetID = requestID; + request.TransferRequestID = transferRequest.TransferInfo.TransferID; + this.RequestedAssets.Add(requestID,request); + this._assetServer.RequestAsset(requestID, false); return; } //it is in our cache - AssetInfo info = this.Assets[requestID]; + AssetInfo asset = this.Assets[requestID]; //work out how many packets it should be sent in // and add to the AssetRequests list + AssetRequest req = new AssetRequest(); + req.RequestUser = userInfo; + req.RequestAssetID = requestID; + req.TransferRequestID = transferRequest.TransferInfo.TransferID; + req.AssetInf = asset; + + if(asset.Data.LongLength>600) + { + //over 600 bytes so split up file + req.NumPackets = 1 + (int)(asset.Data.Length-600+999)/1000; + } + else + { + req.NumPackets = 1; + } + + this.AssetRequests.Add(req); } /// @@ -204,6 +252,77 @@ namespace OpenSim.Assets /// private void ProcessAssetQueue() { + if(this.AssetRequests.Count == 0) + { + //no requests waiting + return; + } + int num; + + if(this.AssetRequests.Count < 5) + { + //lower than 5 so do all of them + num = this.AssetRequests.Count; + } + else + { + num=5; + } + AssetRequest req; + for(int i = 0; i < num; i++) + { + req=(AssetRequest)this.AssetRequests[i]; + + TransferInfoPacket Transfer = new TransferInfoPacket(); + Transfer.TransferInfo.ChannelType = 2; + Transfer.TransferInfo.Status = 0; + Transfer.TransferInfo.TargetType = 0; + Transfer.TransferInfo.Params = req.RequestAssetID.GetBytes(); + Transfer.TransferInfo.Size = (int)req.AssetInf.Data.Length; + Transfer.TransferInfo.TransferID = req.TransferRequestID; + req.RequestUser.OutPacket(Transfer); + + if(req.NumPackets == 1) + { + TransferPacketPacket TransferPacket = new TransferPacketPacket(); + TransferPacket.TransferData.Packet = 0; + TransferPacket.TransferData.ChannelType = 2; + TransferPacket.TransferData.TransferID=req.TransferRequestID; + TransferPacket.TransferData.Data = req.AssetInf.Data; + TransferPacket.TransferData.Status = 1; + req.RequestUser.OutPacket(TransferPacket); + } + else + { + //more than one packet so split file up , for now it can't be bigger than 2000 bytes + TransferPacketPacket TransferPacket = new TransferPacketPacket(); + TransferPacket.TransferData.Packet = 0; + TransferPacket.TransferData.ChannelType = 2; + TransferPacket.TransferData.TransferID=req.TransferRequestID; + byte[] chunk = new byte[1000]; + Array.Copy(req.AssetInf.Data,chunk,1000); + TransferPacket.TransferData.Data = chunk; + TransferPacket.TransferData.Status = 0; + req.RequestUser.OutPacket(TransferPacket); + + TransferPacket = new TransferPacketPacket(); + TransferPacket.TransferData.Packet = 1; + TransferPacket.TransferData.ChannelType = 2; + TransferPacket.TransferData.TransferID = req.TransferRequestID; + byte[] chunk1 = new byte[(req.AssetInf.Data.Length-1000)]; + Array.Copy(req.AssetInf.Data, 1000, chunk1, 0, chunk1.Length); + TransferPacket.TransferData.Data = chunk1; + TransferPacket.TransferData.Status = 1; + req.RequestUser.OutPacket(TransferPacket); + } + + } + + //remove requests that have been completed + for(int i = 0; i < num; i++) + { + this.AssetRequests.RemoveAt(i); + } } @@ -223,15 +342,17 @@ namespace OpenSim.Assets //not is cache so request from asset server AssetRequest request = new AssetRequest(); request.RequestUser = userInfo; - request.RequestImage = imageID; - this.TextureRequests.Add(request); - this._assetServer.RequestAsset(imageID); + request.RequestAssetID = imageID; + request.IsTextureRequest = true; + this.RequestedTextures.Add(imageID, request); + this._assetServer.RequestAsset(imageID, true); return; } TextureImage imag = this.Textures[imageID]; AssetRequest req = new AssetRequest(); req.RequestUser = userInfo; - req.RequestImage = imageID; + req.RequestAssetID = imageID; + req.IsTextureRequest = true; req.ImageInfo = imag; if(imag.Data.LongLength>600) @@ -253,12 +374,14 @@ namespace OpenSim.Assets public class AssetRequest { public OpenSimClient RequestUser; - public LLUUID RequestImage; + public LLUUID RequestAssetID; public AssetInfo AssetInf; public TextureImage ImageInfo; + public LLUUID TransferRequestID; public long DataPointer = 0; public int NumPackets = 0; public int PacketCounter = 0; + public bool IsTextureRequest; //public bool AssetInCache; //public int TimeRequested; @@ -289,6 +412,16 @@ namespace OpenSim.Assets { } + + public AssetInfo(AssetBase aBase) + { + Data= aBase.Data; + FullID = aBase.FullID; + Type = aBase.Type; + InvType = aBase.InvType; + Name= aBase.Name; + Description = aBase.Description; + } } public class TextureImage : AssetBase @@ -297,6 +430,16 @@ namespace OpenSim.Assets { } + + public TextureImage(AssetBase aBase) + { + Data= aBase.Data; + FullID = aBase.FullID; + Type = aBase.Type; + InvType = aBase.InvType; + Name= aBase.Name; + Description = aBase.Description; + } } } diff --git a/GridServers/IAssetServer.cs b/GridServers/IAssetServer.cs index f6141b96a5..4287f16a99 100644 --- a/GridServers/IAssetServer.cs +++ b/GridServers/IAssetServer.cs @@ -26,6 +26,10 @@ */ using System; +using System.Net; +using System.Net.Sockets; +using System.IO; +using System.Threading; using libsecondlife; using OpenSim.Assets; @@ -36,40 +40,125 @@ namespace OpenSim.GridServers /// public class LocalAssetServer : IAssetServer { + private IAssetReceiver _receiver; + private BlockingQueue _assetRequests; + public LocalAssetServer() { + this._assetRequests = new BlockingQueue(); } - public void SetReceiver(IAssetReceived receiver) + public void SetReceiver(IAssetReceiver receiver) { - + this._receiver = receiver; } - public void RequestAsset(LLUUID assetID) + + public void RequestAsset(LLUUID assetID, bool isTexture) { - + ARequest req = new ARequest(); + req.AssetID = assetID; + req.IsTexture = isTexture; + this._assetRequests.Enqueue(req); } + public void UpdateAsset(AssetBase asset) { } + public void UploadNewAsset(AssetBase asset) { } + + private void RunRequests() + { + while(true) + { + + } + } + } + + public class RemoteAssetServer : IAssetServer + { + private IAssetReceiver _receiver; + private BlockingQueue _assetRequests; + private Thread _remoteAssetServerThread; + + + public RemoteAssetServer() + { + this._assetRequests = new BlockingQueue(); + this._remoteAssetServerThread = new Thread(new ThreadStart(RunRequests)); + this._remoteAssetServerThread.IsBackground = true; + this._remoteAssetServerThread.Start(); + } + + public void SetReceiver(IAssetReceiver receiver) + { + this._receiver = receiver; + } + + public void RequestAsset(LLUUID assetID, bool isTexture) + { + ARequest req = new ARequest(); + req.AssetID = assetID; + req.IsTexture = isTexture; + this._assetRequests.Enqueue(req); + } + + public void UpdateAsset(AssetBase asset) + { + + } + + public void UploadNewAsset(AssetBase asset) + { + + } + + private void RunRequests() + { + while(true) + { + //we need to add support for the asset server not knowing about a requested asset + ARequest req = this._assetRequests.Dequeue(); + LLUUID assetID = req.AssetID; + Console.WriteLine(" RemoteAssetServer- Got a AssetServer request, processing it"); + WebRequest AssetLoad = WebRequest.Create(OpenSim_Main.cfg.AssetURL + "getasset/" + OpenSim_Main.cfg.AssetSendKey + "/" + assetID + "/data"); + WebResponse AssetResponse = AssetLoad.GetResponse(); + byte[] idata = new byte[(int)AssetResponse.ContentLength]; + BinaryReader br = new BinaryReader(AssetResponse.GetResponseStream()); + idata = br.ReadBytes((int)AssetResponse.ContentLength); + br.Close(); + + AssetBase asset = new AssetBase(); + asset.FullID = assetID; + asset.Data = idata; + _receiver.AssetReceived(asset, req.IsTexture ); + } + } } public interface IAssetServer { - void SetReceiver(IAssetReceived receiver); - void RequestAsset(LLUUID assetID); + void SetReceiver(IAssetReceiver receiver); + void RequestAsset(LLUUID assetID, bool isTexture); void UpdateAsset(AssetBase asset); void UploadNewAsset(AssetBase asset); } - // could change to delegate - public interface IAssetReceived + // could change to delegate? + public interface IAssetReceiver { - void AssetReceived(AssetBase asset); + void AssetReceived(AssetBase asset, bool IsTexture); void AssetNotFound(AssetBase asset); } + + public struct ARequest + { + public LLUUID AssetID; + public bool IsTexture; + } } diff --git a/Main.cs b/Main.cs index 60de7c869a..742d3fa5af 100644 --- a/Main.cs +++ b/Main.cs @@ -40,6 +40,7 @@ using libsecondlife; using libsecondlife.Packets; using OpenSim.world; using OpenSim.GridServers; +using OpenSim.Assets; using PhysicsSystem; namespace OpenSim @@ -53,6 +54,7 @@ namespace OpenSim public static SimConfig cfg; public static World local_world; public static Grid gridServers; + public static AssetCache assetCache; //private static Thread MainListener; //private static Thread PingRespponder; public static Socket Server; @@ -101,6 +103,7 @@ namespace OpenSim LoginServer loginServer = new LoginServer(OpenSim_Main.gridServers.GridServer); loginServer.Startup(); } + assetCache = new AssetCache(OpenSim_Main.gridServers.AssetServer); sim.Startup(); while(true) { @@ -191,12 +194,12 @@ namespace OpenSim { if(sandbox) { - this.AssetServer =(IAssetServer) new LocalAssetServer(); + this.AssetServer =(IAssetServer) new LocalAssetServer(); //assets not implemented yet this.GridServer =(IGridServer) new LocalGridServer(); } else { - this.AssetServer =(IAssetServer) new LocalAssetServer(); //assets not implemented yet + this.AssetServer =(IAssetServer) new RemoteAssetServer(); //assets not implemented yet this.GridServer =(IGridServer) new RemoteGridServer(); } } diff --git a/OpenSimClient.cs b/OpenSimClient.cs index a8c06f488a..57189b0d5a 100644 --- a/OpenSimClient.cs +++ b/OpenSimClient.cs @@ -51,8 +51,7 @@ namespace OpenSim private UseCircuitCodePacket cirpack; private Thread ClientThread; private EndPoint userEP; - private BlockingQueue PacketQueue; - private BlockingQueue AssetRequests; + private BlockingQueue PacketQueue; private Dictionary PendingAcks = new Dictionary(); private Dictionary NeedAck = new Dictionary(); private System.Timers.Timer AckTimer; @@ -79,59 +78,6 @@ namespace OpenSim } } - public void AssetLoader() { - /* - Console.WriteLine("OpenSimClient.cs:AssetLoader() - Starting new thread"); - TransferRequestPacket reqPacket = AssetRequests.Dequeue(); - Console.WriteLine("OpenSimClient.cs:AssetLoader() - Got a request, processing it"); - LLUUID AssetID = new LLUUID(reqPacket.TransferInfo.Params, 0); - WebRequest AssetLoad = WebRequest.Create(OpenSim_Main.cfg.AssetURL + "getasset/" + OpenSim_Main.cfg.AssetSendKey + "/" + AssetID + "/data"); - WebResponse AssetResponse = AssetLoad.GetResponse(); - byte[] idata = new byte[(int)AssetResponse.ContentLength]; - BinaryReader br = new BinaryReader(AssetResponse.GetResponseStream()); - idata = br.ReadBytes((int)AssetResponse.ContentLength); - br.Close(); - - TransferInfoPacket Transfer = new TransferInfoPacket(); - Transfer.TransferInfo.ChannelType = 2; - Transfer.TransferInfo.Status = 0; - Transfer.TransferInfo.TargetType = 0; - Transfer.TransferInfo.Params = reqPacket.TransferInfo.Params; - Transfer.TransferInfo.Size = (int)AssetResponse.ContentLength; - Transfer.TransferInfo.TransferID = reqPacket.TransferInfo.TransferID; - - OutPacket(Transfer); - - TransferPacketPacket TransferPacket = new TransferPacketPacket(); - TransferPacket.TransferData.Packet = 0; - TransferPacket.TransferData.ChannelType = 2; - TransferPacket.TransferData.TransferID=reqPacket.TransferInfo.TransferID; - - if(AssetResponse.ContentLength>1000) { - byte[] chunk = new byte[1000]; - Array.Copy(idata,chunk,1000); - TransferPacket.TransferData.Data = chunk; - TransferPacket.TransferData.Status = 0; - OutPacket(TransferPacket); - - TransferPacket = new TransferPacketPacket(); - TransferPacket.TransferData.Packet = 1; - TransferPacket.TransferData.ChannelType = 2; - TransferPacket.TransferData.TransferID = reqPacket.TransferInfo.TransferID; - byte[] chunk1 = new byte[(idata.Length-1000)]; - Array.Copy(idata, 1000, chunk1, 0, chunk1.Length); - TransferPacket.TransferData.Data = chunk1; - TransferPacket.TransferData.Status = 1; - OutPacket(TransferPacket); - } else { - TransferPacket.TransferData.Status = 1; - TransferPacket.TransferData.Data = idata; - OutPacket(TransferPacket); - } - AssetResponse.Close(); - */ - } - public void ProcessInPacket(Packet Pack) { ack_pack(Pack); switch(Pack.Type) { @@ -147,11 +93,8 @@ namespace OpenSim break; case PacketType.TransferRequest: //Console.WriteLine("OpenSimClient.cs:ProcessInPacket() - Got transfer request"); - // We put transfer requests into a big queue and then spawn a thread for each new one - //TransferRequestPacket transfer = (TransferRequestPacket)Pack; - //AssetRequests.Enqueue(transfer); - //Thread AssetLoaderThread = new Thread(new ThreadStart(AssetLoader)); - //AssetLoaderThread.Start(); + TransferRequestPacket transfer = (TransferRequestPacket)Pack; + OpenSim_Main.assetCache.AddAssetRequest(this, transfer); break; case PacketType.AgentUpdate: ClientAvatar.HandleUpdate((AgentUpdatePacket)Pack); @@ -373,7 +316,6 @@ namespace OpenSim cirpack = initialcirpack; userEP = remoteEP; PacketQueue = new BlockingQueue(); - AssetRequests = new BlockingQueue(); AckTimer = new System.Timers.Timer(500); AckTimer.Elapsed += new ElapsedEventHandler(AckTimer_Elapsed); AckTimer.Start();