Review Board 1.7.16


Add a serializer interface to the threadpool

Review Request #2323 - Created Feb. 8, 2013 and submitted

David Lee
/trunk
Reviewers
asterisk-dev
mmichelson
Asterisk
This patch adds the ability to create a serializer from a thread pool. A
serializer is a ast_taskprocessor with the same contract as a default
taskprocessor (tasks execute serially) except instead of executing out
of a dedicated thread, execution occurs in a thread from a
ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the
next, there is no guarantee as to which thread from the pool individual
tasks will execute. This normally only matters if your code relys on
thread specific information, such as thread locals.

This patch also fixes a bug in how the 'was_empty' parameter is computed
for the push callback.
Unit testing.

Changes between revision 1 and 3

1 2 3
1 2 3

  1. /trunk/main/taskprocessor.c: Loading...
  2. /trunk/main/threadpool.c: Loading...
  3. /trunk/tests/test_taskprocessor.c: Loading...
  4. /trunk/tests/test_threadpool.c: Loading...
/trunk/main/taskprocessor.c
Diff Revision 1 Diff Revision 3
[20] 72 lines
[+20] [+] struct ast_taskprocessor {
73
	/*! \brief Taskprocessor queue */
73
	/*! \brief Taskprocessor queue */
74
	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
74
	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
75
	/*! \brief Taskprocessor singleton list entry */
75
	/*! \brief Taskprocessor singleton list entry */
76
	AST_LIST_ENTRY(ast_taskprocessor) list;
76
	AST_LIST_ENTRY(ast_taskprocessor) list;
77
	struct ast_taskprocessor_listener *listener;
77
	struct ast_taskprocessor_listener *listener;
78
	/*! Indicates if the taskprocessor is in the process of shuting down */

   
79
	unsigned int shutting_down:1;

   
80
	/*! Indicates if the taskprocessor is currently executing a task */
78
	/*! Indicates if the taskprocessor is currently executing a task */
81
	unsigned int executing:1;
79
	unsigned int executing:1;
82
};
80
};
83

    
   
81

   
84
/*!
82
/*!
[+20] [20] 112 lines
[+20] [+] static int default_listener_start(struct ast_taskprocessor_listener *listener)
197

    
   
195

   
198
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
196
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
199
{
197
{
200
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
198
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
201

    
   
199

   

    
   
200
	ast_assert(!pvt->dead);

    
   
201

   
202
	if (was_empty) {
202
	if (was_empty) {
203
		default_tps_wake_up(pvt, 0);
203
		default_tps_wake_up(pvt, 0);
204
	}
204
	}
205
}
205
}
206

    
   
206

   
[+20] [20] 240 lines
[+20] [+] static void tps_taskprocessor_destroy(void *tps)
447
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
447
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
448
{
448
{
449
	struct tps_task *task;
449
	struct tps_task *task;
450
	SCOPED_AO2LOCK(lock, tps);
450
	SCOPED_AO2LOCK(lock, tps);
451

    
   
451

   
452
	if (tps->shutting_down) {

   
453
		return NULL;

   
454
	}

   
455

    
   

   
456
	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
452
	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
457
		tps->tps_queue_size--;
453
		tps->tps_queue_size--;
458
	}
454
	}
459
	return task;
455
	return task;
460
}
456
}
[+20] [20] 211 lines
[+20] [+] int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
672

    
   
668

   
673
	ao2_lock(tps);
669
	ao2_lock(tps);
674
	tps->executing = 1;
670
	tps->executing = 1;
675
	ao2_unlock(tps);
671
	ao2_unlock(tps);
676

    
   
672

   
677
	if (!(t = tps_taskprocessor_pop(tps))) {
673
	t = tps_taskprocessor_pop(tps);
678
		ao2_lock(tps);

   
679
		/* We need to check size in the same critical section where we

   
680
		 * reset the executing bit. Avoids a race condition where a task

   
681
		 * is pushed right after we pop an empty stack.

   
682
		 */

   
683
		tps->executing = 0;

   
684
		size = tps_taskprocessor_depth(tps);

   
685
		ao2_unlock(tps);

   
686
		return size > 0;

   
687
	}

   
688

    
   
674

   

    
   
675
	if (t) {
689
	t->execute(t->datap);
676
		t->execute(t->datap);
690

    
   

   
691
	tps_task_free(t);
677
		tps_task_free(t);

    
   
678
	}
692

    
   
679

   
693
	ao2_lock(tps);
680
	ao2_lock(tps);

    
   
681
	/* We need to check size in the same critical section where we reset the

    
   
682
	 * executing bit. Avoids a race condition where a task is pushed right

    
   
683
	 * after we pop an empty stack.

    
   
684
	 */
694
	tps->executing = 0;
685
	tps->executing = 0;
695
	size = tps_taskprocessor_depth(tps);
686
	size = tps_taskprocessor_depth(tps);
696
	if (tps->stats) {
687
	/* If we executed a task, bump the stats */

    
   
688
	if (t && tps->stats) {
697
		tps->stats->_tasks_processed_count++;
689
		tps->stats->_tasks_processed_count++;
698
		if (size > tps->stats->max_qsize) {
690
		if (size > tps->stats->max_qsize) {
699
			tps->stats->max_qsize = size;
691
			tps->stats->max_qsize = size;
700
		}
692
		}
701
	}
693
	}
702
	ao2_unlock(tps);
694
	ao2_unlock(tps);
703

    
   
695

   
704
	if (size == 0 && tps->listener->callbacks->emptied) {
696
	/* If we executed a task, check for the transition to empty */

    
   
697
	if (t && size == 0 && tps->listener->callbacks->emptied) {
705
		tps->listener->callbacks->emptied(tps->listener);
698
		tps->listener->callbacks->emptied(tps->listener);
706
		return 0;

   
707
	}
699
	}
708
	return 1;
700
	return size > 0;
709
}
701
}
/trunk/main/threadpool.c
Diff Revision 1 Diff Revision 3
 
/trunk/tests/test_taskprocessor.c
Diff Revision 1 Diff Revision 3
 
/trunk/tests/test_threadpool.c
Diff Revision 1 Diff Revision 3
 
  1. /trunk/main/taskprocessor.c: Loading...
  2. /trunk/main/threadpool.c: Loading...
  3. /trunk/tests/test_taskprocessor.c: Loading...
  4. /trunk/tests/test_threadpool.c: Loading...

https://reviewboard.asterisk.org/ runs on a server provided by Digium, Inc. and uses bandwidth donated to the open source Asterisk community by API Digital Communications in Huntsville, AL USA.
Please report problems with this site to asteriskteam@digium.com.