more changes on http server low level

master
UbitUmarov 2020-04-08 14:35:31 +01:00
parent f0f067d05c
commit f976d10de2
6 changed files with 175 additions and 126 deletions

View File

@ -93,17 +93,17 @@ namespace OSHttpServer
{
while (!m_shuttingDown)
{
m_processWaitEven.WaitOne(100);
m_processWaitEven.WaitOne(500);
if(m_shuttingDown)
return;
double now = GetTimeStampMS();
double now = GetTimeStamp();
if(m_contexts.Count > 0)
{
ProcessSendQueues(now);
if (now - m_lastTimeOutCheckTime > 1000)
if (now - m_lastTimeOutCheckTime > 1.0)
{
ProcessContextTimeouts();
m_lastTimeOutCheckTime = now;
@ -158,8 +158,8 @@ namespace OSHttpServer
if(curConcurrentLimit > inqueues)
curConcurrentLimit = inqueues;
if (dt > 0.1)
dt = 0.1;
if (dt > 0.5)
dt = 0.5;
dt /= curConcurrentLimit;
int curbytesLimit = (int)(m_maxBandWidth * dt);
@ -250,68 +250,64 @@ namespace OSHttpServer
if (context.contextID < 0 || context.StopMonitoring || context.StreamPassedOff)
return true;
// Now we start checking for actual timeouts
int nowMS = EnvironmentTickCount();
// First we check that we got at least one line within context.TimeoutFirstLine milliseconds
// First we check first contact line
if (!context.FirstRequestLineReceived)
{
if (EnvironmentTickCountAdd(context.TimeoutFirstLine, context.MonitorStartMS) <= EnvironmentTickCount())
if (EnvironmentTickCountAdd(context.TimeoutFirstLine, context.LastActivityTimeMS) < nowMS)
{
disconnectError = SocketError.TimedOut;
context.MonitorStartMS = 0;
return true;
}
return false;
}
// First we check first contact request
if (!context.FullRequestReceived)
{
if (EnvironmentTickCountAdd(context.TimeoutRequestReceived, context.MonitorStartMS) <= EnvironmentTickCount())
if (EnvironmentTickCountAdd(context.TimeoutRequestReceived, context.LastActivityTimeMS) < nowMS)
{
disconnectError = SocketError.TimedOut;
context.MonitorStartMS = 0;
return true;
}
}
//
if (!context.FullRequestProcessed)
{
if (EnvironmentTickCountAdd(context.TimeoutFullRequestProcessed, context.MonitorStartMS) <= EnvironmentTickCount())
{
disconnectError = SocketError.TimedOut;
context.MonitorStartMS = 0;
return true;
}
return false;
}
if (context.TriggerKeepalive)
{
context.TriggerKeepalive = false;
context.MonitorKeepaliveMS = EnvironmentTickCount();
context.MonitorKeepaliveStartMS = nowMS;
return false;
}
if (context.FullRequestProcessed && context.MonitorKeepaliveMS == 0)
return true;
if (context.MonitorKeepaliveStartMS != 0)
{
if (EnvironmentTickCountAdd(context.TimeoutKeepAlive, context.MonitorKeepaliveStartMS) < nowMS)
{
disconnectError = SocketError.TimedOut;
context.MonitorKeepaliveStartMS = 0;
return true;
}
return false;
}
if (context.MonitorKeepaliveMS != 0 &&
EnvironmentTickCountAdd(context.TimeoutKeepAlive, context.MonitorKeepaliveMS) <= EnvironmentTickCount())
if (EnvironmentTickCountAdd(context.TimeoutMaxIdle, context.LastActivityTimeMS) < nowMS)
{
disconnectError = SocketError.TimedOut;
context.MonitorStartMS = 0;
context.MonitorKeepaliveMS = 0;
context.MonitorKeepaliveStartMS = 0;
return true;
}
return false;
}
public static void StartMonitoringContext(HttpClientContext context)
{
context.MonitorStartMS = EnvironmentTickCount();
context.LastActivityTimeMS = EnvironmentTickCount();
m_contexts.Enqueue(context);
}
public static void EnqueueSend(HttpClientContext context, int priority)
public static void EnqueueSend(HttpClientContext context, int priority, bool notThrottled = true)
{
switch(priority)
{
@ -327,7 +323,8 @@ namespace OSHttpServer
default:
return;
}
m_processWaitEven.Set();
if(notThrottled)
m_processWaitEven.Set();
}
public static void ContextEnterActiveSend()

View File

@ -20,7 +20,7 @@ namespace OSHttpServer
/// </remarks>
public class HttpClientContext : IHttpClientContext, IDisposable
{
const int MAXPIPEREQUESTS = 5;
const int MAXREQUESTS = 20;
const int MAXKEEPALIVE = 60000;
static private int basecontextID;
@ -36,22 +36,20 @@ namespace OSHttpServer
public bool Available = true;
public bool StreamPassedOff = false;
public int MonitorStartMS = 0;
public int MonitorKeepaliveMS = 0;
public int LastActivityTimeMS = 0;
public int MonitorKeepaliveStartMS = 0;
public bool TriggerKeepalive = false;
public int TimeoutFirstLine = 70000; // 70 seconds
public int TimeoutRequestReceived = 180000; // 180 seconds
public int TimeoutFirstLine = 10000; // 10 seconds
public int TimeoutRequestReceived = 30000; // 30 seconds
// The difference between this and request received is on POST more time is needed before we get the full request.
public int TimeoutFullRequestProcessed = 600000; // 10 minutes
public int TimeoutMaxIdle = 180000; // 3 minutes
public int m_TimeoutKeepAlive = MAXKEEPALIVE; // 400 seconds before keepalive timeout
// public int TimeoutKeepAlive = 120000; // 400 seconds before keepalive timeout
public int m_maxPipeRequests = MAXPIPEREQUESTS;
public int m_maxRequests = MAXREQUESTS;
public bool FirstRequestLineReceived;
public bool FullRequestReceived;
public bool FullRequestProcessed;
private bool isSendingResponse = false;
@ -68,15 +66,15 @@ namespace OSHttpServer
}
}
public int MaxPipeRequests
public int MaxRequests
{
get { return m_maxPipeRequests; }
get { return m_maxRequests; }
set
{
if(value <= 1)
m_maxPipeRequests = 1;
m_maxRequests = 1;
else
m_maxPipeRequests = value > MAXPIPEREQUESTS ? MAXPIPEREQUESTS : value;
m_maxRequests = value > MAXREQUESTS ? MAXREQUESTS : value;
}
}
@ -118,7 +116,7 @@ namespace OSHttpServer
m_parser.BodyBytesReceived += OnBodyBytesReceived;
m_currentRequest = new HttpRequest(this);
IsSecured = secured;
_stream = stream;
m_stream = stream;
m_sock = sock;
m_bufferSize = 8196;
@ -128,7 +126,7 @@ namespace OSHttpServer
SSLCommonName = "";
if (secured)
{
SslStream _ssl = (SslStream)_stream;
SslStream _ssl = (SslStream)m_stream;
X509Certificate _cert1 = _ssl.RemoteCertificate;
if (_cert1 != null)
{
@ -150,7 +148,7 @@ namespace OSHttpServer
if (contextID < 0)
return false;
if (Stream == null || m_sock == null || !m_sock.Connected)
if (m_stream == null || m_sock == null || !m_sock.Connected)
return false;
return true;
@ -191,9 +189,11 @@ namespace OSHttpServer
m_currentRequest.UriPath = e.UriPath;
m_currentRequest.AddHeader("remote_addr", RemoteAddress);
m_currentRequest.AddHeader("remote_port", RemotePort);
FirstRequestLineReceived = true;
TriggerKeepalive = false;
MonitorKeepaliveMS = 0;
MonitorKeepaliveStartMS = 0;
LastActivityTimeMS = ContextTimeoutManager.EnvironmentTickCount();
}
/// <summary>
@ -218,10 +218,10 @@ namespace OSHttpServer
if (StreamPassedOff)
return;
if (Stream != null)
if (m_stream != null)
{
Stream.Close();
Stream = null;
m_stream.Close();
m_stream = null;
m_sock = null;
}
@ -233,10 +233,9 @@ namespace OSHttpServer
FirstRequestLineReceived = false;
FullRequestReceived = false;
FullRequestProcessed = false;
MonitorStartMS = 0;
LastActivityTimeMS = 0;
StopMonitoring = true;
MonitorKeepaliveMS = 0;
MonitorKeepaliveStartMS = 0;
TriggerKeepalive = false;
isSendingResponse = false;
@ -283,15 +282,15 @@ namespace OSHttpServer
}
}
private Stream _stream;
private Stream m_stream;
/// <summary>
/// Gets or sets the network stream.
/// </summary>
internal Stream Stream
{
get { return _stream; }
set { _stream = value; }
get { return m_stream; }
set { m_stream = value; }
}
/// <summary>
@ -315,12 +314,12 @@ namespace OSHttpServer
{
try
{
if (Stream != null)
if (m_stream != null)
{
if (error == SocketError.Success)
Stream.Flush();
Stream.Close();
Stream = null;
m_stream.Flush();
m_stream.Close();
m_stream = null;
}
m_sock = null;
}
@ -339,11 +338,11 @@ namespace OSHttpServer
try
{
int bytesRead = 0;
if (Stream == null)
if (m_stream == null)
return;
try
{
bytesRead = Stream.EndRead(ar);
bytesRead = m_stream.EndRead(ar);
}
catch (NullReferenceException)
{
@ -357,7 +356,7 @@ namespace OSHttpServer
return;
}
if(m_maxPipeRequests <= 0)
if(m_maxRequests <= 0)
return;
m_ReceiveBytesLeft += bytesRead;
@ -367,7 +366,7 @@ namespace OSHttpServer
}
int offset = m_parser.Parse(m_ReceiveBuffer, 0, m_ReceiveBytesLeft);
if (Stream == null)
if (m_stream == null)
return; // "Connection: Close" in effect.
// try again to see if we can parse another message (check parser to see if it is looking for a new message)
@ -378,7 +377,7 @@ namespace OSHttpServer
{
nextOffset = m_parser.Parse(m_ReceiveBuffer, offset, nextBytesleft);
if (Stream == null)
if (m_stream == null)
return; // "Connection: Close" in effect.
if (nextOffset == 0)
@ -393,7 +392,7 @@ namespace OSHttpServer
Buffer.BlockCopy(m_ReceiveBuffer, offset, m_ReceiveBuffer, 0, m_ReceiveBytesLeft - offset);
m_ReceiveBytesLeft -= offset;
if (Stream != null && Stream.CanRead)
if (m_stream != null && m_stream.CanRead)
{
if (!StreamPassedOff)
Stream.BeginRead(m_ReceiveBuffer, m_ReceiveBytesLeft, m_ReceiveBuffer.Length - m_ReceiveBytesLeft, OnReceive, null);
@ -449,10 +448,10 @@ namespace OSHttpServer
{
while(true)
{
if (_stream == null || !_stream.CanRead)
if (m_stream == null || !m_stream.CanRead)
return;
int bytesRead = await _stream.ReadAsync(m_ReceiveBuffer, m_ReceiveBytesLeft, m_ReceiveBuffer.Length - m_ReceiveBytesLeft).ConfigureAwait(false);
int bytesRead = await m_stream.ReadAsync(m_ReceiveBuffer, m_ReceiveBytesLeft, m_ReceiveBuffer.Length - m_ReceiveBytesLeft).ConfigureAwait(false);
if (bytesRead == 0)
{
@ -465,7 +464,7 @@ namespace OSHttpServer
throw new BadRequestException("HTTP header Too large: " + m_ReceiveBytesLeft);
int offset = m_parser.Parse(m_ReceiveBuffer, 0, m_ReceiveBytesLeft);
if (Stream == null)
if (m_stream == null)
return; // "Connection: Close" in effect.
// try again to see if we can parse another message (check parser to see if it is looking for a new message)
@ -476,7 +475,7 @@ namespace OSHttpServer
{
nextOffset = m_parser.Parse(m_ReceiveBuffer, offset, nextBytesleft);
if (Stream == null)
if (m_stream == null)
return; // "Connection: Close" in effect.
if (nextOffset == 0)
@ -536,13 +535,14 @@ namespace OSHttpServer
private void OnRequestCompleted(object source, EventArgs args)
{
TriggerKeepalive = false;
MonitorKeepaliveMS = 0;
MonitorKeepaliveStartMS = 0;
FullRequestReceived = true;
LastActivityTimeMS = ContextTimeoutManager.EnvironmentTickCount();
if (m_maxPipeRequests == 0)
if (m_maxRequests == 0)
return;
if(--m_maxPipeRequests == 0)
if(--m_maxRequests == 0)
m_currentRequest.Connection = ConnectionType.Close;
// load cookies if they exist
@ -574,13 +574,9 @@ namespace OSHttpServer
}
}
public void ReqResponseAboutToSend(uint requestID)
{
isSendingResponse = true;
}
public void StartSendResponse(HttpResponse response)
{
LastActivityTimeMS = ContextTimeoutManager.EnvironmentTickCount();
isSendingResponse = true;
m_currentResponse = response;
ContextTimeoutManager.EnqueueSend(this, response.Priority);
@ -595,19 +591,19 @@ namespace OSHttpServer
if(!CanSend())
return false;
m_currentResponse?.SendNextAsync(bytesLimit);
return false;
}
public void ContinueSendResponse()
public void ContinueSendResponse(bool notThrottled)
{
if(m_currentResponse == null)
return;
ContextTimeoutManager.EnqueueSend(this, m_currentResponse.Priority);
ContextTimeoutManager.EnqueueSend(this, m_currentResponse.Priority, notThrottled);
}
public void ReqResponseSent(uint requestID, ConnectionType ctype)
public void EndSendResponse(uint requestID, ConnectionType ctype)
{
isSendingResponse = false;
m_currentResponse?.Clear();
@ -626,6 +622,7 @@ namespace OSHttpServer
Disconnect(SocketError.Success);
else
{
LastActivityTimeMS = ContextTimeoutManager.EnvironmentTickCount();
lock (requestsInServiceIDs)
{
if (requestsInServiceIDs.Count == 0)
@ -645,22 +642,23 @@ namespace OSHttpServer
/// <exception cref="ArgumentException">If <paramref name="httpVersion"/> is invalid.</exception>
public void Respond(string httpVersion, HttpStatusCode statusCode, string reason, string body, string contentType)
{
LastActivityTimeMS = ContextTimeoutManager.EnvironmentTickCount();
if (string.IsNullOrEmpty(contentType))
contentType = "text/html";
if (string.IsNullOrEmpty(reason))
reason = statusCode.ToString();
string response = string.IsNullOrEmpty(body)
? httpVersion + " " + (int)statusCode + " " + reason + "\r\n\r\n"
: string.Format("{0} {1} {2}\r\nContent-Type: {5}\r\nContent-Length: {3}\r\n\r\n{4}",
byte[] buffer;
if(string.IsNullOrEmpty(body))
buffer = Encoding.ASCII.GetBytes(httpVersion + " " + (int)statusCode + " " + reason ?? statusCode.ToString() + "\r\n\r\n");
else
buffer = Encoding.UTF8.GetBytes(
string.Format("{0} {1} {2}\r\nContent-Type: {5}\r\nContent-Length: {3}\r\n\r\n{4}",
httpVersion, (int)statusCode, reason ?? statusCode.ToString(),
body.Length, body, contentType);
byte[] buffer = Encoding.ASCII.GetBytes(response);
body.Length, body, contentType));
Send(buffer);
if (m_currentRequest.Connection == ConnectionType.Close)
FullRequestProcessed = true;
}
/// <summary>
@ -671,6 +669,7 @@ namespace OSHttpServer
/// <param name="reason">reason for the status code.</param>
public void Respond(string httpVersion, HttpStatusCode statusCode, string reason)
{
Respond(httpVersion, statusCode, reason, null, null);
}
@ -699,7 +698,7 @@ namespace OSHttpServer
public bool Send(byte[] buffer, int offset, int size)
{
if (Stream == null || m_sock == null || !m_sock.Connected)
if (m_stream == null || m_sock == null || !m_sock.Connected)
return false;
if (offset + size > buffer.Length)
@ -710,14 +709,14 @@ namespace OSHttpServer
{
try
{
Stream.Write(buffer, offset, size);
m_stream.Write(buffer, offset, size);
}
catch
{
ok = false;
}
if (!ok && Stream != null)
if (!ok && m_stream != null)
Disconnect(SocketError.NoRecovery);
return ok;
}
@ -725,7 +724,7 @@ namespace OSHttpServer
public async Task<bool> SendAsync(byte[] buffer, int offset, int size)
{
if (Stream == null || m_sock == null || !m_sock.Connected)
if (m_stream == null || m_sock == null || !m_sock.Connected)
return false;
if (offset + size > buffer.Length)
@ -735,7 +734,7 @@ namespace OSHttpServer
ContextTimeoutManager.ContextEnterActiveSend();
try
{
await Stream.WriteAsync(buffer, offset, size).ConfigureAwait(false);
await m_stream.WriteAsync(buffer, offset, size).ConfigureAwait(false);
}
catch
{
@ -744,7 +743,7 @@ namespace OSHttpServer
ContextTimeoutManager.ContextLeaveActiveSend();
if (!ok && Stream != null)
if (!ok && m_stream != null)
Disconnect(SocketError.NoRecovery);
return ok;
}
@ -770,7 +769,7 @@ namespace OSHttpServer
m_parser.BodyBytesReceived -= OnBodyBytesReceived;
m_parser.Clear();
return new HTTPNetworkContext() { Socket = m_sock, Stream = _stream as NetworkStream };
return new HTTPNetworkContext() { Socket = m_sock, Stream = m_stream as NetworkStream };
}
public void Dispose()

View File

@ -10,6 +10,8 @@ namespace OSHttpServer
{
public class HttpResponse : IHttpResponse
{
public event EventHandler<BandWitdhEventArgs> BandWitdhEvent;
private const string DefaultContentType = "text/html;charset=UTF-8";
private readonly IHttpClientContext m_context;
private readonly ResponseCookies m_cookies = new ResponseCookies();
@ -245,9 +247,9 @@ namespace OSHttpServer
sb.Append("Server: OSWebServer\r\n");
int keepaliveS = m_context.TimeoutKeepAlive / 1000;
if (Connection == ConnectionType.KeepAlive && keepaliveS > 0 && m_context.MaxPipeRequests > 0)
if (Connection == ConnectionType.KeepAlive && keepaliveS > 0 && m_context.MaxRequests > 0)
{
sb.AppendFormat("Keep-Alive:timeout={0}, max={1}\r\n", keepaliveS, m_context.MaxPipeRequests);
sb.AppendFormat("Keep-Alive:timeout={0}, max={1}\r\n", keepaliveS, m_context.MaxRequests);
sb.Append("Connection: Keep-Alive\r\n");
}
else
@ -282,7 +284,7 @@ namespace OSHttpServer
if (Sent)
throw new InvalidOperationException("Everything have already been sent.");
if (m_context.MaxPipeRequests == 0 || m_keepAlive == 0)
if (m_context.MaxRequests == 0 || m_keepAlive == 0)
{
Connection = ConnectionType.Close;
m_context.TimeoutKeepAlive = 0;
@ -326,7 +328,12 @@ namespace OSHttpServer
{
if(!await m_context.SendAsync(m_headerBytes, 0, m_headerBytes.Length).ConfigureAwait(false))
{
if(m_body != null)
if (m_context.CanSend())
{
m_context.ContinueSendResponse(true);
return;
}
if (m_body != null)
m_body.Dispose();
RawBuffer = null;
Sent = true;
@ -336,7 +343,7 @@ namespace OSHttpServer
m_headerBytes = null;
if(bytesLimit <= 0)
{
m_context.ContinueSendResponse();
m_context.ContinueSendResponse(true);
return;
}
}
@ -345,21 +352,34 @@ namespace OSHttpServer
{
if (RawBufferLen > 0)
{
if(BandWitdhEvent!=null)
bytesLimit = CheckBandwidth(RawBufferLen, bytesLimit);
bool sendRes;
if(RawBufferLen > bytesLimit)
{
sendRes = await m_context.SendAsync(RawBuffer, RawBufferStart, bytesLimit).ConfigureAwait(false);
RawBufferLen -= bytesLimit;
RawBufferStart += bytesLimit;
sendRes = (await m_context.SendAsync(RawBuffer, RawBufferStart, bytesLimit).ConfigureAwait(false));
if (sendRes)
{
RawBufferLen -= bytesLimit;
RawBufferStart += bytesLimit;
}
}
else
{
sendRes = await m_context.SendAsync(RawBuffer, RawBufferStart, RawBufferLen).ConfigureAwait(false);
RawBufferLen = 0;
if(sendRes)
RawBufferLen = 0;
}
if (!sendRes)
{
if (m_context.CanSend())
{
m_context.ContinueSendResponse(true);
return;
}
RawBuffer = null;
if(m_body != null)
Body.Dispose();
@ -371,7 +391,7 @@ namespace OSHttpServer
RawBuffer = null;
else
{
m_context.ContinueSendResponse();
m_context.ContinueSendResponse(true);
return;
}
}
@ -391,17 +411,26 @@ namespace OSHttpServer
if (RawBufferLen > bytesLimit)
{
sendRes = await m_context.SendAsync(RawBuffer, RawBufferStart, bytesLimit).ConfigureAwait(false);
RawBufferLen -= bytesLimit;
RawBufferStart += bytesLimit;
if (sendRes)
{
RawBufferLen -= bytesLimit;
RawBufferStart += bytesLimit;
}
}
else
{
sendRes = await m_context.SendAsync(RawBuffer, RawBufferStart, RawBufferLen).ConfigureAwait(false);
RawBufferLen = 0;
if (sendRes)
RawBufferLen = 0;
}
if (!sendRes)
{
if (m_context.CanSend())
{
m_context.ContinueSendResponse(true);
return;
}
RawBuffer = null;
Sent = true;
return;
@ -409,7 +438,7 @@ namespace OSHttpServer
}
if (RawBufferLen > 0)
{
m_context.ContinueSendResponse();
m_context.ContinueSendResponse(false);
return;
}
}
@ -417,7 +446,19 @@ namespace OSHttpServer
if (m_body != null)
m_body.Dispose();
Sent = true;
m_context.ReqResponseSent(requestID, Connection);
m_context.EndSendResponse(requestID, Connection);
}
private int CheckBandwidth(int request, int bytesLimit)
{
if(request > bytesLimit)
request = bytesLimit;
var args = new BandWitdhEventArgs(request);
BandWitdhEvent?.Invoke(this, args);
if(args.Result > 8196)
return args.Result;
return 8196;
}
public void Clear()

View File

@ -23,7 +23,7 @@ namespace OSHttpServer
int contextID {get;}
int TimeoutKeepAlive {get; set; }
int MaxPipeRequests{get; set; }
int MaxRequests{get; set; }
bool CanSend();
bool IsSending();
@ -92,11 +92,11 @@ namespace OSHttpServer
HTTPNetworkContext GiveMeTheNetworkStreamIKnowWhatImDoing();
void StartSendResponse(HttpResponse response);
void ContinueSendResponse();
void ReqResponseAboutToSend(uint requestID);
void ReqResponseSent(uint requestID, ConnectionType connection);
void ContinueSendResponse(bool notThrottled);
void EndSendResponse(uint requestID, ConnectionType connection);
bool TrySendResponse(int limit);
}
public class HTTPNetworkContext
{
public NetworkStream Stream;
@ -143,4 +143,6 @@ namespace OSHttpServer
}
}
}

View File

@ -154,12 +154,5 @@ namespace OSHttpServer
/// </summary>
/// <param name="cookies">The cookies.</param>
void SetCookies(RequestCookies cookies);
/// <summary>
/// Create a response object.
/// </summary>
/// <param name="context">Context for the connected client.</param>
/// <returns>A new <see cref="IHttpResponse"/>.</returns>
//IHttpResponse CreateResponse(IHttpClientContext context);
}
}

View File

@ -26,11 +26,13 @@ namespace OSHttpServer
/// </example>
public interface IHttpResponse
{
event EventHandler<BandWitdhEventArgs> BandWitdhEvent;
/// <summary>
/// The body stream is used to cache the body contents
/// before sending everything to the client. It's the simplest
/// way to serve documents.
/// </summary>
///
Stream Body { get; }
byte[] RawBuffer { get; set; }
int RawBufferStart { get; set; }
@ -142,4 +144,19 @@ namespace OSHttpServer
/// </summary>
KeepAlive
}
public class BandWitdhEventArgs : EventArgs
{
/// <summary>
/// Gets received request.
/// </summary>
public int Result;
public int Request;
public BandWitdhEventArgs(int request)
{
Request = request;
Result = request;
}
}
}