Hangfire.IO – More Complex Application

Background

With our first Application out of the way, let us develop a slightly more involved console App.

Why?

Why such a fascination with HangFire.IO?

One of the tools that we use to monitor SQL Server is Sql Server Management Studio – Activity Monitor.

SQL Server Management Studio ( SSMS )

Activity Monitor

Image

Explanation

  1. delete top (1) from [HangFire].JobQueue with (readpast, updlock, rowlock)
    output DELETED.Id, DELETED.JobId, DELETED.Queue
    where (FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))
    and Queue in (@queues1)

    • 286 per minute
  2. update [HangFire].Server set LastHeartbeat = @now where Id = @id
    • 48 per minute

 

Lab

Data Model

Outline

We have two tables, homework.task and homework.taskDetail.

The homework.task will have the following columns

  1. taskID
  2. taskLabel
  3. interval
  4. cronExpression

And, the homework.taskDetail column will have

  1. taskID
  2. processedCount

The application will store task information in the homework.task table.

Based on time schedule, we increment the processedCount column in the homework.taskDetail table.

 

Database Diagram

SQL Script

Table

Table – homework.task


if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')


end
go

if object_id('[homework].[task]') is null
begin

	create table [homework].[task]
	(

		  [taskID]			int not null
		, [taskLabel]		varchar(60) not null
		, [interval]		int not null

		, [cronExpression]  varchar(60) not null

		, [dateAdded]		datetime not null
			constraint [constraintHWTaskDefaultDateAdded] 
				default getdate()

		, [dateUpdated]		datetime null

		, constraint [PK_HWTask] primary key
			(
				[taskID]
			)
			 			
	)

end
go



 

Table – homework.taskDetail


if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')

end
go

if object_id('[homework].[taskDetail]') is null
begin

	create table [homework].[taskDetail]
	(

		  [taskID]			int not null
		
		, [processedCount]  int not null

		, [dateAdded]		datetime not null
			constraint [constraintHWTaskDetailDefaultDateAdded] 
				default getdate()

		, [dateUpdated]		datetime null

		, constraint [PK_HWTaskDetail] primary key
			(
				[taskID]
			)

		, constraint [FK_HWTaskDetail] foreign key
			(
				[taskID]
			)
			references [homework].[task]
			(
				[taskID]
			)
			 			
	)


end
go



Stored Procedure

SP – homework.ups_taskInit



if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')


end
go


if object_id('[homework].[usp_taskInit]') is null
begin

	exec('create procedure [homework].[usp_taskInit] as ')


end
go

alter procedure [homework].[usp_taskInit]
as

begin

	set nocount on;
	set XACT_ABORT on;

	truncate table [homework].[taskDetail]
	
	delete from [homework].[task]

end
go



 

SP – homework.ups_taskAdd


if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')


end
go

if object_id('[homework].[usp_taskAdd]') is null
begin

	exec('create procedure [homework].[usp_taskAdd] as ')


end
go

alter procedure [homework].[usp_taskAdd]
	  @taskID		   int 
	, @taskLabel	   varchar(60)
	, @interval		   int 

	, @cronExpression  varchar(60)

as

begin

	set nocount on;
	set XACT_ABORT on;

	MERGE [homework].[task] AS tblTarget
    USING 
			(
				SELECT 
						  [taskID] = @taskID
						, [taskLabel] = @taskLabel
						, [interval] = @interval
						, [cronExpression] = @cronExpression
						, [dateAdded] = getdate()
			) AS tblSource
	ON 
			(
				    (tblTarget.taskID = tblSource.taskID )
			)
	WHEN MATCHED THEN
    UPDATE 
	SET 
		  tblTarget.[taskLabel] = tblSource.[taskLabel]
		, [interval]= tblSource.[interval]
		, [cronExpression] = tblSource.[cronExpression]
		, tblTarget.[dateUpdated] = getdate()

	WHEN NOT MATCHED BY TARGET THEN
		INSERT 
		(
			  [taskID]
			, [taskLabel]
			, [interval]
			, [cronExpression]
			, [dateAdded]
		)
		VALUES 
		(
			  [taskID]
			, [taskLabel]
			, [interval]
			, [cronExpression]
			, [dateAdded]

		)
	
	;
end
go




SP – homework.ups_taskDetailAdd



if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')


end
go

if object_id('[homework].[usp_taskDetailAdd]') is null
begin

	exec('create procedure [homework].[usp_taskDetailAdd] as ')


end
go

alter procedure [homework].[usp_taskDetailAdd]
	  @taskID		   int 

as

begin

	set nocount on;
	set XACT_ABORT on;

	MERGE [homework].[taskDetail] AS tblTarget
    USING 
			(
				SELECT [taskID] = @taskID
			) AS tblSource
	ON 
			(
				    (tblTarget.taskID = tblSource.taskID )
			)
	WHEN MATCHED THEN
    UPDATE 
	SET 
		  tblTarget.[processedCount] = isNull(tblTarget.[processedCount], 0) + 1
		, tblTarget.[dateUpdated] = getdate()

	WHEN NOT MATCHED BY TARGET THEN
		INSERT 
		(
			  [taskID]
			, [processedCount]
		)
		VALUES 
		(
			  [taskID]
			, 1
		)
	
	;

end
go



Function

Function – homework.itvf_taskDetail


if schema_id('homework') is null
begin

	exec('create schema [homework] authorization [dbo]')


end
go

if object_id('[homework].[itvf_taskDetail]') is null
begin

	exec('create function [homework].[itvf_taskDetail]() returns table return ( select [shell] = 1/0 ) ')


end
go

alter function [homework].[itvf_taskDetail]()
returns TABLE
return
(
	select	
	
			  tblT.[taskID]
			, tblT.[taskLabel]
			, tblT.[interval]
			, tblT.[cronExpression]

			, tblTD.[processedCount]
			, tblTD.[dateAdded]
			, tblTD.[dateUpdated]

	from    [homework].[task] tblT

	left outer join [homework].[taskDetail] tblTD

		on tblT.[taskID] = tblTD.[taskID]

)
go


C# Code


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

using System.Data;
using System.Data.SqlClient;
using System.Configuration;

using Hangfire;
using Hangfire.SqlServer;


namespace consoleApp
{
    class Program
    {
		static Thread  t;
		static DateTime dtStartTime;
		static DateTime dtCurrent;
		static TimeSpan tsElapsed;
		static String strLog;

		static DateTime dtExpectedEndTS;
			
		const int MAX_TIME_ELAPSED_MINUTES = 60;
		
		const string FORMAT_MAX_TIME_ELAPSED = "Exiting after {0} seconds";
		const string FORMAT_CRONTAB_DISPLAY = "TASK ID {0}, CRON EXPRESSION {1}";
		
		static int iMaxDurationInMinutes = MAX_TIME_ELAPSED_MINUTES ;
			
		// 1 second
		static TimeSpan	tsPeriodicCheck = new TimeSpan(0, 0, 1);
		
		static public bool IsCancelled { get; set; } 

		const String connectionStringDemoID = "connectionStringDemo";
		static String conStringDemo = null;
			
		const String connectionStringHFID = "connectionStringHF";
		
		static DateTime dtStartTS = (DateTime.Now);
			
		static String dtStartTSAsString = (dtStartTS).ToString(@"hh\:mm\:ss"); 

			
        static void Main(string[] args)
        {
			
			getDBConnectionString();
			
			getAppConfig();
			
			IsCancelled = false;
			
			//Instanciate Thread
			t = new Thread(threadBackgroundJobScheduler);
			
			//Start Thread
			t.Start();
		
			Console.WriteLine("Hangfire Server started.");			
			
			Console.WriteLine("On Main thread, wait 30 seconds");			
			
			Thread.Sleep(30);
				
    	
			//wait for thread to finish
			if (t != null)
			{
			
				t.Join();
				
			}		
			
			
            
		}

		static void threadBackgroundJobScheduler()
		{
						
			useHangFire();
	
		}	

		static void scheduledTaskInitDB()
		{	
		
			string dbSQL = "[homework].[usp_taskInit]";
			using(SqlConnection sqlConnection = new SqlConnection(conStringDemo))
			using(SqlCommand sqlCommand = new SqlCommand(dbSQL , sqlConnection))
			{

				sqlCommand.CommandType = CommandType.StoredProcedure;    

				sqlConnection.Open();
				sqlCommand.ExecuteNonQuery();
			}

		}
		
		static void scheduledTaskIndividualDB(scheduleTask objScheduleTask )
		{	
		
			string dbSQL = "[homework].[usp_taskAdd]";
			using(SqlConnection sqlConnection = new SqlConnection(conStringDemo))
			using(SqlCommand sqlCommand = new SqlCommand(dbSQL , sqlConnection))
			{

				sqlCommand.CommandType = CommandType.StoredProcedure;    
				
				sqlCommand.Parameters.Add("@taskID", SqlDbType.Int).Value = objScheduleTask.taskID;
				sqlCommand.Parameters.Add("@taskLabel", SqlDbType.VarChar).Value = objScheduleTask.taskLabel;
				sqlCommand.Parameters.Add("@interval", SqlDbType.Int).Value = objScheduleTask.interval;
				sqlCommand.Parameters.Add("@cronExpression", SqlDbType.VarChar).Value = objScheduleTask.cronExpression;
				
				sqlConnection.Open();
				sqlCommand.ExecuteNonQuery();
			}

		}
		
        static void scheduleTaskIndividual(int id, int interval)
        {
			

			String strCronExpression;
			String taskLabel;
			scheduleTask objScheduleTask;
			String strTaskEntry;
		
			objScheduleTask = new scheduleTask(id, interval);
			taskLabel = objScheduleTask.taskLabel;
			strCronExpression = objScheduleTask.cronExpression;
		
			strTaskEntry = String.Format(FORMAT_CRONTAB_DISPLAY, taskLabel, strCronExpression);
			
			Console.WriteLine(strTaskEntry);
			
			scheduledTaskIndividualDB(objScheduleTask );
			
			RecurringJob.AddOrUpdate
				(
					  taskLabel
					, () => backgroundTask.processTask(id)
					, strCronExpression
				);
				
			
		}
		
        static void scheduleTasks()
        {
			scheduledTaskInitDB();
			
			scheduleTaskIndividual(1, 1);
			
			scheduleTaskIndividual(2, 3);
			
			scheduleTaskIndividual(3, 5);
			
			scheduleTaskIndividual(4, 10);
			
			scheduleTaskIndividual(5, 15);
			
			scheduleTaskIndividual(6, 30);
			
			scheduleTaskIndividual(7, 45);
			
			scheduleTaskIndividual(8, 60);
			
			
		}
		
        static void useHangFire()
        {

			//get Current Time
			dtStartTime = DateTime.Now;
					
            GlobalConfiguration.Configuration.UseSqlServerStorage(connectionStringHFID);
			
            using (var server = new BackgroundJobServer())
            {

		
				BackgroundJob.Schedule(() => backgroundTask.sayHello(), TimeSpan.FromSeconds(2));
				
				scheduleTasks();
				
			
				while 
				(
					IsCancelled==false
				) 
				{ 
					
					dtCurrent = DateTime.Now;

					tsElapsed = dtCurrent - dtStartTime;
				
					if (dtCurrent >= dtExpectedEndTS)
					{
						
						IsCancelled=true;
						
						strLog = String.Format(FORMAT_MAX_TIME_ELAPSED, tsElapsed.TotalSeconds);  
						
						Console.WriteLine(strLog);
						
						
						break;
						
					}
				
					//wait a little longer
					Thread.Sleep(tsPeriodicCheck); 
			
				} //while not cancelled
			
			}//using	
			

		} // useHangFire()
	 
	 
		static private void getDBConnectionString()
		{
			
			if (ConfigurationManager.ConnectionStrings[connectionStringDemoID] != null)
			{
				
				conStringDemo = ConfigurationManager.ConnectionStrings[connectionStringDemoID].ConnectionString;
			
			}
			else
			{
				 throw new System.Exception("connectionStringDemoID not found in app.config");
				
			}
			
		} //getDBConnectionString()
		

		static private void getAppConfig()
		{
			
			String strMaxDurationInMinutes;
			bool    bRC = false;

			String   dtExpectedEndTSAsString;
			
			System.TimeSpan tsDurationInMins;
			
			iMaxDurationInMinutes = MAX_TIME_ELAPSED_MINUTES;
			
			Console.WriteLine("Start Date :- " + dtStartTSAsString);
			
			
			if (ConfigurationManager.AppSettings["maxDurationInMinutes"] != null)
			{
				
				strMaxDurationInMinutes = ConfigurationManager.AppSettings["maxDurationInMinutes"];
				
				try
				{
					bRC = Int32.TryParse(strMaxDurationInMinutes, out iMaxDurationInMinutes);
				}
				catch			
				{
					iMaxDurationInMinutes = MAX_TIME_ELAPSED_MINUTES;
				}
				
				tsDurationInMins = new System.TimeSpan(0, iMaxDurationInMinutes, 0);
				
				dtExpectedEndTS = dtStartTS.Add(tsDurationInMins);
				
				dtExpectedEndTSAsString = (dtExpectedEndTS).ToString(@"hh\:mm\:ss"); 
				
				Console.WriteLine("Expected End Date :- " + dtExpectedEndTSAsString);

				
			}
			else
			{
				
				iMaxDurationInMinutes = MAX_TIME_ELAPSED_MINUTES;
				
				tsDurationInMins = new System.TimeSpan(0, iMaxDurationInMinutes, 0);
				
				dtExpectedEndTS = (DateTime.Now).Add(tsDurationInMins);
				
			}
			
		} //getAppConfig()
				
		
    } // class program
	
	
	class backgroundTask
	{
		
		static String FORMAT_DATETIME = "{0:ddd, MMM d, yyyy  hh:mm tt}";
		
		static String connectionStringDemoID = "connectionStringDemo";
		static String conStringDemo = null;
				
		
		[AutomaticRetry(Attempts = 0)]
		static public void sayHello()
		{
			
			String strTimeStamp;
			String strLog;
			
			strTimeStamp = String.Format(FORMAT_DATETIME, DateTime.Now);
			
			strLog = "At " + strTimeStamp + " ... Saying Hello";

			//Say Time
			Console.WriteLine(strLog);
		
			
		} //sayHello
		
		static private void getDBConnectionString()
		{
			
			if (conStringDemo == null)
			{
				
				
				if (ConfigurationManager.ConnectionStrings[connectionStringDemoID] != null)
				{
					
					conStringDemo = ConfigurationManager.ConnectionStrings[connectionStringDemoID].ConnectionString;
				
				}
				else
				{
					 throw new System.Exception("connectionStringDemoID not found in app.config");
					
				}
			}
			
		} //getDBConnectionString()
		
		
		[AutomaticRetry(Attempts = 0)]
		[DisableConcurrentExecution(timeoutInSeconds: 10 * 60)]
		static public void processTask(int taskID)
		{
	
			string dbSQL = "[homework].[usp_taskDetailAdd]";
			
			
			if(conStringDemo == null)
			{
				getDBConnectionString();
			}
			
			using(SqlConnection sqlConnection = new SqlConnection(conStringDemo))
			using(SqlCommand sqlCommand = new SqlCommand(dbSQL , sqlConnection))
			{

				sqlCommand.CommandType = CommandType.StoredProcedure;    
				
				sqlCommand.Parameters.Add("@taskID", SqlDbType.Int).Value = taskID;
				
				sqlConnection.Open();
				sqlCommand.ExecuteNonQuery();
			}
			
		} //processTask
		
		
	}
	
}




 

 

Run

Here is the output from running the app..

Run Output

 

SQL Track

Code


select *

from   [homework].[itvf_taskDetail]()

SQL Output

 

Source Code

GitHub

As alway posted to GitHub.

Here is the URL.

 

Summary

Will come back and share our findings in terms of reliability and performance.

It will be a bit long and so will save it for another post.

Reference

  1. How to write your first multi-threaded application with c#
    Link
  2. How do I get the time difference between two DateTime objects using C#?
    Link

 

One thought on “Hangfire.IO – More Complex Application

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s