another attempt at fixing asset lockups

0.6.0-stable
MW 2008-02-27 16:20:45 +00:00
parent f218e7e090
commit aac7c1dda5
1 changed files with 250 additions and 227 deletions

View File

@ -69,12 +69,12 @@ namespace OpenSim.Framework.Communications.Cache
///
/// Assets requests which are waiting for asset server data. This includes texture requests
/// </summary>
private Dictionary<LLUUID, AssetRequest> RequestedAssets;
private Dictionary<LLUUID, AssetRequest> RequestedAssets;
/// <summary>
/// Asset requests with data which are ready to be sent back to requesters. This includes textures.
/// </summary>
private List<AssetRequest> AssetRequests;
private List<AssetRequest> AssetRequests;
/// <summary>
@ -94,8 +94,8 @@ namespace OpenSim.Framework.Communications.Cache
m_log.InfoFormat("Assets:{0} Textures:{1} AssetRequests:{2} RequestedAssets:{3} RequestLists:{4}",
Assets.Count,
Textures.Count,
AssetRequests.Count,
RequestedAssets.Count,
AssetRequests.Count,
RequestedAssets.Count,
RequestLists.Count);
int temporaryImages = 0;
@ -150,9 +150,9 @@ namespace OpenSim.Framework.Communications.Cache
{
Assets = new Dictionary<LLUUID, AssetInfo>();
Textures = new Dictionary<LLUUID, TextureImage>();
AssetRequests = new List<AssetRequest>();
AssetRequests = new List<AssetRequest>();
RequestedAssets = new Dictionary<LLUUID, AssetRequest>();
RequestedAssets = new Dictionary<LLUUID, AssetRequest>();
RequestLists = new Dictionary<LLUUID, AssetRequestsList>();
}
@ -168,32 +168,32 @@ namespace OpenSim.Framework.Communications.Cache
m_assetServer = assetServer;
m_assetServer.SetReceiver(this);
m_assetCacheThread = new Thread(new ThreadStart(RunAssetManager));
m_assetCacheThread.Name = "AssetCacheThread";
m_assetCacheThread.IsBackground = true;
m_assetCacheThread.Start();
OpenSim.Framework.ThreadTracker.Add(m_assetCacheThread);
m_assetCacheThread = new Thread(new ThreadStart(RunAssetManager));
m_assetCacheThread.Name = "AssetCacheThread";
m_assetCacheThread.IsBackground = true;
m_assetCacheThread.Start();
OpenSim.Framework.ThreadTracker.Add(m_assetCacheThread);
}
/// <summary>
/// Process the asset queue which holds data which is packeted up and sent
/// directly back to the client.
/// </summary>
public void RunAssetManager()
{
while (true)
public void RunAssetManager()
{
try
while (true)
{
ProcessAssetQueue();
Thread.Sleep(500);
}
catch (Exception e)
{
m_log.Error("[ASSET CACHE]: " + e.ToString());
try
{
ProcessAssetQueue();
Thread.Sleep(500);
}
catch (Exception e)
{
m_log.Error("[ASSET CACHE]: " + e.ToString());
}
}
}
}
/// <summary>
/// Only get an asset if we already have it in the cache.
@ -265,6 +265,7 @@ namespace OpenSim.Framework.Communications.Cache
AssetRequestsList requestList;
lock (RequestLists)
{
// m_log.Info("AssetCache: Lock taken on requestLists (GetAsset)");
if (RequestLists.TryGetValue(assetId, out requestList))
{
}
@ -274,6 +275,7 @@ namespace OpenSim.Framework.Communications.Cache
RequestLists.Add(assetId, requestList);
}
}
// m_log.Info("AssetCache: Lock released on requestLists (GetAsset)");
requestList.Requests.Add(req);
@ -447,19 +449,19 @@ namespace OpenSim.Framework.Communications.Cache
StatsManager.SimExtraStats.AddAsset(assetInf);
}
if (RequestedAssets.ContainsKey(assetInf.FullID))
{
#if DEBUG
//m_log.DebugFormat("[ASSET CACHE]: Moving {0} from RequestedAssets to AssetRequests", asset.FullID);
#endif
if (RequestedAssets.ContainsKey(assetInf.FullID))
{
#if DEBUG
//m_log.DebugFormat("[ASSET CACHE]: Moving {0} from RequestedAssets to AssetRequests", asset.FullID);
#endif
AssetRequest req = RequestedAssets[assetInf.FullID];
req.AssetInf = assetInf;
req.NumPackets = CalculateNumPackets(assetInf.Data);
AssetRequest req = RequestedAssets[assetInf.FullID];
req.AssetInf = assetInf;
req.NumPackets = CalculateNumPackets(assetInf.Data);
RequestedAssets.Remove(assetInf.FullID);
AssetRequests.Add(req);
}
RequestedAssets.Remove(assetInf.FullID);
AssetRequests.Add(req);
}
}
}
@ -469,9 +471,11 @@ namespace OpenSim.Framework.Communications.Cache
AssetRequestsList reqList = null;
lock (RequestLists)
{
//m_log.Info("AssetCache: Lock taken on requestLists (AssetReceived #1)");
reqList = RequestLists[asset.FullID];
}
//m_log.Info("AssetCache: Lock released on requestLists (AssetReceived #1)");
if (reqList != null)
{
//making a copy of the list is not ideal
@ -486,8 +490,10 @@ namespace OpenSim.Framework.Communications.Cache
lock (RequestLists)
{
// m_log.Info("AssetCache: Lock taken on requestLists (AssetReceived #2)");
RequestLists.Remove(asset.FullID);
}
//m_log.Info("AssetCache: Lock released on requestLists (AssetReceived #2)");
foreach (NewAssetRequest req in theseRequests)
{
@ -501,217 +507,234 @@ namespace OpenSim.Framework.Communications.Cache
// See IAssetReceiver
public void AssetNotFound(LLUUID assetID)
{
m_log.WarnFormat("[ASSET CACHE]: AssetNotFound for {0}", assetID);
// m_log.WarnFormat("[ASSET CACHE]: AssetNotFound for {0}", assetID);
// Notify requesters for this asset
AssetRequestsList reqList = null;
lock (RequestLists)
{
// m_log.Info("AssetCache: Lock taken on requestLists (AssetNotFound #1)");
if (RequestLists.ContainsKey(assetID))
{
AssetRequestsList reqList = RequestLists[assetID];
foreach (NewAssetRequest req in reqList.Requests)
{
req.Callback(assetID, null);
}
reqList = RequestLists[assetID];
}
}
// m_log.Info("AssetCache: Lock released on requestLists (AssetNotFound #1)");
if (reqList != null)
{
List<NewAssetRequest> theseRequests = new List<NewAssetRequest>(reqList.Requests);
reqList.Requests.Clear();
lock (RequestLists)
{
// m_log.Info("AssetCache: Lock taken on requestLists (AssetNotFound #2)");
RequestLists.Remove(assetID);
}
}
}
// m_log.Info("AssetCache: Lock released on requestLists (AssetNotFound #2)");
/// <summary>
/// Calculate the number of packets required to send the asset to the client.
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
private int CalculateNumPackets(byte[] data)
{
const uint m_maxPacketSize = 600;
int numPackets = 1;
if (data.LongLength > m_maxPacketSize)
{
// over max number of bytes so split up file
long restData = data.LongLength - m_maxPacketSize;
int restPackets = (int)((restData + m_maxPacketSize - 1) / m_maxPacketSize);
numPackets += restPackets;
}
return numPackets;
}
/// <summary>
/// Make an asset request the result of which will be packeted up and sent directly back to the client.
/// </summary>
/// <param name="userInfo"></param>
/// <param name="transferRequest"></param>
public void AddAssetRequest(IClientAPI userInfo, TransferRequestPacket transferRequest)
{
LLUUID requestID = null;
byte source = 2;
if (transferRequest.TransferInfo.SourceType == 2)
{
//direct asset request
requestID = new LLUUID(transferRequest.TransferInfo.Params, 0);
}
else if (transferRequest.TransferInfo.SourceType == 3)
{
//inventory asset request
requestID = new LLUUID(transferRequest.TransferInfo.Params, 80);
source = 3;
//Console.WriteLine("asset request " + requestID);
}
//check to see if asset is in local cache, if not we need to request it from asset server.
//Console.WriteLine("asset request " + requestID);
if (!Assets.ContainsKey(requestID))
{
//not found asset
// so request from asset server
if (!RequestedAssets.ContainsKey(requestID))
foreach (NewAssetRequest req in theseRequests)
{
AssetRequest request = new AssetRequest();
request.RequestUser = userInfo;
request.RequestAssetID = requestID;
request.TransferRequestID = transferRequest.TransferInfo.TransferID;
request.AssetRequestSource = source;
request.Params = transferRequest.TransferInfo.Params;
RequestedAssets.Add(requestID, request);
m_assetServer.RequestAsset(requestID, false);
}
return;
}
//it is in our cache
AssetInfo asset = Assets[requestID];
// add to the AssetRequests list
AssetRequest req = new AssetRequest();
req.RequestUser = userInfo;
req.RequestAssetID = requestID;
req.TransferRequestID = transferRequest.TransferInfo.TransferID;
req.AssetRequestSource = source;
req.Params = transferRequest.TransferInfo.Params;
req.AssetInf = asset;
req.NumPackets = CalculateNumPackets(asset.Data);
AssetRequests.Add(req);
}
/// <summary>
/// Process the asset queue which sends packets directly back to the client.
/// </summary>
private void ProcessAssetQueue()
{
//should move the asset downloading to a module, like has been done with texture downloading
if (AssetRequests.Count == 0)
{
//no requests waiting
return;
}
// if less than 5, do all of them
int num = Math.Min(5, AssetRequests.Count);
AssetRequest req;
for (int i = 0; i < num; i++)
{
req = (AssetRequest)AssetRequests[i];
//Console.WriteLine("sending asset " + req.RequestAssetID);
TransferInfoPacket Transfer = new TransferInfoPacket();
Transfer.TransferInfo.ChannelType = 2;
Transfer.TransferInfo.Status = 0;
Transfer.TransferInfo.TargetType = 0;
if (req.AssetRequestSource == 2)
{
Transfer.TransferInfo.Params = new byte[20];
Array.Copy(req.RequestAssetID.GetBytes(), 0, Transfer.TransferInfo.Params, 0, 16);
int assType = (int)req.AssetInf.Type;
Array.Copy(Helpers.IntToBytes(assType), 0, Transfer.TransferInfo.Params, 16, 4);
}
else if (req.AssetRequestSource == 3)
{
Transfer.TransferInfo.Params = req.Params;
// Transfer.TransferInfo.Params = new byte[100];
//Array.Copy(req.RequestUser.AgentId.GetBytes(), 0, Transfer.TransferInfo.Params, 0, 16);
//Array.Copy(req.RequestUser.SessionId.GetBytes(), 0, Transfer.TransferInfo.Params, 16, 16);
}
Transfer.TransferInfo.Size = (int)req.AssetInf.Data.Length;
Transfer.TransferInfo.TransferID = req.TransferRequestID;
req.RequestUser.OutPacket(Transfer, ThrottleOutPacketType.Asset);
if (req.NumPackets == 1)
{
TransferPacketPacket TransferPacket = new TransferPacketPacket();
TransferPacket.TransferData.Packet = 0;
TransferPacket.TransferData.ChannelType = 2;
TransferPacket.TransferData.TransferID = req.TransferRequestID;
TransferPacket.TransferData.Data = req.AssetInf.Data;
TransferPacket.TransferData.Status = 1;
req.RequestUser.OutPacket(TransferPacket, ThrottleOutPacketType.Asset);
}
else
{
int processedLength = 0;
// libsecondlife hardcodes 1500 as the maximum data chunk size
int maxChunkSize = 1250;
int packetNumber = 0;
while (processedLength < req.AssetInf.Data.Length)
{
TransferPacketPacket TransferPacket = new TransferPacketPacket();
TransferPacket.TransferData.Packet = packetNumber;
TransferPacket.TransferData.ChannelType = 2;
TransferPacket.TransferData.TransferID = req.TransferRequestID;
int chunkSize = Math.Min(req.AssetInf.Data.Length - processedLength, maxChunkSize);
byte[] chunk = new byte[chunkSize];
Array.Copy(req.AssetInf.Data, processedLength, chunk, 0, chunk.Length);
TransferPacket.TransferData.Data = chunk;
// 0 indicates more packets to come, 1 indicates last packet
if (req.AssetInf.Data.Length - processedLength > maxChunkSize)
{
TransferPacket.TransferData.Status = 0;
}
else
{
TransferPacket.TransferData.Status = 1;
}
req.RequestUser.OutPacket(TransferPacket, ThrottleOutPacketType.Asset);
processedLength += chunkSize;
packetNumber++;
}
req.Callback(assetID, null);
}
}
//remove requests that have been completed
for (int i = 0; i < num; i++)
{
AssetRequests.RemoveAt(0);
}
}
public class AssetRequest
{
public IClientAPI RequestUser;
public LLUUID RequestAssetID;
public AssetInfo AssetInf;
public TextureImage ImageInfo;
public LLUUID TransferRequestID;
public long DataPointer = 0;
public int NumPackets = 0;
public int PacketCounter = 0;
public bool IsTextureRequest;
public byte AssetRequestSource = 2;
public byte[] Params = null;
//public bool AssetInCache;
//public int TimeRequested;
public int DiscardLevel = -1;
/// <summary>
/// Calculate the number of packets required to send the asset to the client.
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
private int CalculateNumPackets(byte[] data)
{
const uint m_maxPacketSize = 600;
int numPackets = 1;
if (data.LongLength > m_maxPacketSize)
{
// over max number of bytes so split up file
long restData = data.LongLength - m_maxPacketSize;
int restPackets = (int)((restData + m_maxPacketSize - 1) / m_maxPacketSize);
numPackets += restPackets;
}
return numPackets;
}
/// <summary>
/// Make an asset request the result of which will be packeted up and sent directly back to the client.
/// </summary>
/// <param name="userInfo"></param>
/// <param name="transferRequest"></param>
public void AddAssetRequest(IClientAPI userInfo, TransferRequestPacket transferRequest)
{
LLUUID requestID = null;
byte source = 2;
if (transferRequest.TransferInfo.SourceType == 2)
{
//direct asset request
requestID = new LLUUID(transferRequest.TransferInfo.Params, 0);
}
else if (transferRequest.TransferInfo.SourceType == 3)
{
//inventory asset request
requestID = new LLUUID(transferRequest.TransferInfo.Params, 80);
source = 3;
//Console.WriteLine("asset request " + requestID);
}
//check to see if asset is in local cache, if not we need to request it from asset server.
//Console.WriteLine("asset request " + requestID);
if (!Assets.ContainsKey(requestID))
{
//not found asset
// so request from asset server
if (!RequestedAssets.ContainsKey(requestID))
{
AssetRequest request = new AssetRequest();
request.RequestUser = userInfo;
request.RequestAssetID = requestID;
request.TransferRequestID = transferRequest.TransferInfo.TransferID;
request.AssetRequestSource = source;
request.Params = transferRequest.TransferInfo.Params;
RequestedAssets.Add(requestID, request);
m_assetServer.RequestAsset(requestID, false);
}
return;
}
//it is in our cache
AssetInfo asset = Assets[requestID];
// add to the AssetRequests list
AssetRequest req = new AssetRequest();
req.RequestUser = userInfo;
req.RequestAssetID = requestID;
req.TransferRequestID = transferRequest.TransferInfo.TransferID;
req.AssetRequestSource = source;
req.Params = transferRequest.TransferInfo.Params;
req.AssetInf = asset;
req.NumPackets = CalculateNumPackets(asset.Data);
AssetRequests.Add(req);
}
/// <summary>
/// Process the asset queue which sends packets directly back to the client.
/// </summary>
private void ProcessAssetQueue()
{
//should move the asset downloading to a module, like has been done with texture downloading
if (AssetRequests.Count == 0)
{
//no requests waiting
return;
}
// if less than 5, do all of them
int num = Math.Min(5, AssetRequests.Count);
AssetRequest req;
for (int i = 0; i < num; i++)
{
req = (AssetRequest)AssetRequests[i];
//Console.WriteLine("sending asset " + req.RequestAssetID);
TransferInfoPacket Transfer = new TransferInfoPacket();
Transfer.TransferInfo.ChannelType = 2;
Transfer.TransferInfo.Status = 0;
Transfer.TransferInfo.TargetType = 0;
if (req.AssetRequestSource == 2)
{
Transfer.TransferInfo.Params = new byte[20];
Array.Copy(req.RequestAssetID.GetBytes(), 0, Transfer.TransferInfo.Params, 0, 16);
int assType = (int)req.AssetInf.Type;
Array.Copy(Helpers.IntToBytes(assType), 0, Transfer.TransferInfo.Params, 16, 4);
}
else if (req.AssetRequestSource == 3)
{
Transfer.TransferInfo.Params = req.Params;
// Transfer.TransferInfo.Params = new byte[100];
//Array.Copy(req.RequestUser.AgentId.GetBytes(), 0, Transfer.TransferInfo.Params, 0, 16);
//Array.Copy(req.RequestUser.SessionId.GetBytes(), 0, Transfer.TransferInfo.Params, 16, 16);
}
Transfer.TransferInfo.Size = (int)req.AssetInf.Data.Length;
Transfer.TransferInfo.TransferID = req.TransferRequestID;
req.RequestUser.OutPacket(Transfer, ThrottleOutPacketType.Asset);
if (req.NumPackets == 1)
{
TransferPacketPacket TransferPacket = new TransferPacketPacket();
TransferPacket.TransferData.Packet = 0;
TransferPacket.TransferData.ChannelType = 2;
TransferPacket.TransferData.TransferID = req.TransferRequestID;
TransferPacket.TransferData.Data = req.AssetInf.Data;
TransferPacket.TransferData.Status = 1;
req.RequestUser.OutPacket(TransferPacket, ThrottleOutPacketType.Asset);
}
else
{
int processedLength = 0;
// libsecondlife hardcodes 1500 as the maximum data chunk size
int maxChunkSize = 1250;
int packetNumber = 0;
while (processedLength < req.AssetInf.Data.Length)
{
TransferPacketPacket TransferPacket = new TransferPacketPacket();
TransferPacket.TransferData.Packet = packetNumber;
TransferPacket.TransferData.ChannelType = 2;
TransferPacket.TransferData.TransferID = req.TransferRequestID;
int chunkSize = Math.Min(req.AssetInf.Data.Length - processedLength, maxChunkSize);
byte[] chunk = new byte[chunkSize];
Array.Copy(req.AssetInf.Data, processedLength, chunk, 0, chunk.Length);
TransferPacket.TransferData.Data = chunk;
// 0 indicates more packets to come, 1 indicates last packet
if (req.AssetInf.Data.Length - processedLength > maxChunkSize)
{
TransferPacket.TransferData.Status = 0;
}
else
{
TransferPacket.TransferData.Status = 1;
}
req.RequestUser.OutPacket(TransferPacket, ThrottleOutPacketType.Asset);
processedLength += chunkSize;
packetNumber++;
}
}
}
//remove requests that have been completed
for (int i = 0; i < num; i++)
{
AssetRequests.RemoveAt(0);
}
}
public class AssetRequest
{
public IClientAPI RequestUser;
public LLUUID RequestAssetID;
public AssetInfo AssetInf;
public TextureImage ImageInfo;
public LLUUID TransferRequestID;
public long DataPointer = 0;
public int NumPackets = 0;
public int PacketCounter = 0;
public bool IsTextureRequest;
public byte AssetRequestSource = 2;
public byte[] Params = null;
//public bool AssetInCache;
//public int TimeRequested;
public int DiscardLevel = -1;
public AssetRequest()
{
}
}
public AssetRequest()
{
}
}
public class AssetInfo : AssetBase
{