Review Board 1.7.16


Add a serializer interface to the threadpool

Review Request #2323 - Created Feb. 8, 2013 and submitted

David Lee
/trunk
Reviewers
asterisk-dev
mmichelson
Asterisk
This patch adds the ability to create a serializer from a thread pool. A
serializer is a ast_taskprocessor with the same contract as a default
taskprocessor (tasks execute serially) except instead of executing out
of a dedicated thread, execution occurs in a thread from a
ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the
next, there is no guarantee as to which thread from the pool individual
tasks will execute. This normally only matters if your code relys on
thread specific information, such as thread locals.

This patch also fixes a bug in how the 'was_empty' parameter is computed
for the push callback.
Unit testing.

Changes between revision 1 and 2

1 2 3
1 2 3

  1. /trunk/main/taskprocessor.c: Loading...
  2. /trunk/tests/test_taskprocessor.c: Loading...
  3. /trunk/tests/test_threadpool.c: Loading...
/trunk/main/taskprocessor.c
Diff Revision 1 Diff Revision 2
1
/*
1
/*
2
 * Asterisk -- An open source telephony toolkit.
2
 * Asterisk -- An open source telephony toolkit.
3
 *
3
 *
4
 * Copyright (C) 2007-2013, Digium, Inc.
4
 * Copyright (C) 2007-2013, Digium, Inc.
5
 *
5
 *
6
 * Dwayne M. Hubbard <dhubbard@digium.com>
6
 * Dwayne M. Hubbard <dhubbard@digium.com>
7
 *
7
 *
8
 * See http://www.asterisk.org for more information about
8
 * See http://www.asterisk.org for more information about
9
 * the Asterisk project. Please do not directly contact
9
 * the Asterisk project. Please do not directly contact
10
 * any of the maintainers of this project for assistance;
10
 * any of the maintainers of this project for assistance;
11
 * the project provides a web site, mailing lists and IRC
11
 * the project provides a web site, mailing lists and IRC
12
 * channels for your use.
12
 * channels for your use.
13
 *
13
 *
14
 * This program is free software, distributed under the terms of
14
 * This program is free software, distributed under the terms of
15
 * the GNU General Public License Version 2. See the LICENSE file
15
 * the GNU General Public License Version 2. See the LICENSE file
16
 * at the top of the source tree.
16
 * at the top of the source tree.
17
 */
17
 */
18

    
   
18

   
19
/*!
19
/*!
20
 * \file
20
 * \file
21
 * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
21
 * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22
 *
22
 *
23
 * \author Dwayne Hubbard <dhubbard@digium.com>
23
 * \author Dwayne Hubbard <dhubbard@digium.com>
24
 */
24
 */
25

    
   
25

   
26
/*** MODULEINFO
26
/*** MODULEINFO
27
	<support_level>core</support_level>
27
	<support_level>core</support_level>
28
 ***/
28
 ***/
29

    
   
29

   
30
#include "asterisk.h"
30
#include "asterisk.h"
31

    
   
31

   
32
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
32
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33

    
   
33

   
34
#include "asterisk/_private.h"
34
#include "asterisk/_private.h"
35
#include "asterisk/module.h"
35
#include "asterisk/module.h"
36
#include "asterisk/time.h"
36
#include "asterisk/time.h"
37
#include "asterisk/astobj2.h"
37
#include "asterisk/astobj2.h"
38
#include "asterisk/cli.h"
38
#include "asterisk/cli.h"
39
#include "asterisk/taskprocessor.h"
39
#include "asterisk/taskprocessor.h"
40

    
   
40

   
41
/*!
41
/*!
42
 * \brief tps_task structure is queued to a taskprocessor
42
 * \brief tps_task structure is queued to a taskprocessor
43
 *
43
 *
44
 * tps_tasks are processed in FIFO order and freed by the taskprocessing
44
 * tps_tasks are processed in FIFO order and freed by the taskprocessing
45
 * thread after the task handler returns.  The callback function that is assigned
45
 * thread after the task handler returns.  The callback function that is assigned
46
 * to the execute() function pointer is responsible for releasing datap resources if necessary.
46
 * to the execute() function pointer is responsible for releasing datap resources if necessary.
47
 */
47
 */
48
struct tps_task {
48
struct tps_task {
49
	/*! \brief The execute() task callback function pointer */
49
	/*! \brief The execute() task callback function pointer */
50
	int (*execute)(void *datap);
50
	int (*execute)(void *datap);
51
	/*! \brief The data pointer for the task execute() function */
51
	/*! \brief The data pointer for the task execute() function */
52
	void *datap;
52
	void *datap;
53
	/*! \brief AST_LIST_ENTRY overhead */
53
	/*! \brief AST_LIST_ENTRY overhead */
54
	AST_LIST_ENTRY(tps_task) list;
54
	AST_LIST_ENTRY(tps_task) list;
55
};
55
};
56

    
   
56

   
57
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
57
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
58
struct tps_taskprocessor_stats {
58
struct tps_taskprocessor_stats {
59
	/*! \brief This is the maximum number of tasks queued at any one time */
59
	/*! \brief This is the maximum number of tasks queued at any one time */
60
	unsigned long max_qsize;
60
	unsigned long max_qsize;
61
	/*! \brief This is the current number of tasks processed */
61
	/*! \brief This is the current number of tasks processed */
62
	unsigned long _tasks_processed_count;
62
	unsigned long _tasks_processed_count;
63
};
63
};
64

    
   
64

   
65
/*! \brief A ast_taskprocessor structure is a singleton by name */
65
/*! \brief A ast_taskprocessor structure is a singleton by name */
66
struct ast_taskprocessor {
66
struct ast_taskprocessor {
67
	/*! \brief Friendly name of the taskprocessor */
67
	/*! \brief Friendly name of the taskprocessor */
68
	const char *name;
68
	const char *name;
69
	/*! \brief Taskprocessor statistics */
69
	/*! \brief Taskprocessor statistics */
70
	struct tps_taskprocessor_stats *stats;
70
	struct tps_taskprocessor_stats *stats;
71
	/*! \brief Taskprocessor current queue size */
71
	/*! \brief Taskprocessor current queue size */
72
	long tps_queue_size;
72
	long tps_queue_size;
73
	/*! \brief Taskprocessor queue */
73
	/*! \brief Taskprocessor queue */
74
	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
74
	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
75
	/*! \brief Taskprocessor singleton list entry */
75
	/*! \brief Taskprocessor singleton list entry */
76
	AST_LIST_ENTRY(ast_taskprocessor) list;
76
	AST_LIST_ENTRY(ast_taskprocessor) list;
77
	struct ast_taskprocessor_listener *listener;
77
	struct ast_taskprocessor_listener *listener;
78
	/*! Indicates if the taskprocessor is in the process of shuting down */

   
79
	unsigned int shutting_down:1;

   
80
	/*! Indicates if the taskprocessor is currently executing a task */
78
	/*! Indicates if the taskprocessor is currently executing a task */
81
	unsigned int executing:1;
79
	unsigned int executing:1;
82
};
80
};
83

    
   
81

   
84
/*!
82
/*!
85
 * \brief A listener for taskprocessors
83
 * \brief A listener for taskprocessors
86
 *
84
 *
87
 * \since 12.0.0
85
 * \since 12.0.0
88
 *
86
 *
89
 * When a taskprocessor's state changes, the listener
87
 * When a taskprocessor's state changes, the listener
90
 * is notified of the change. This allows for tasks
88
 * is notified of the change. This allows for tasks
91
 * to be addressed in whatever way is appropriate for
89
 * to be addressed in whatever way is appropriate for
92
 * the module using the taskprocessor.
90
 * the module using the taskprocessor.
93
 */
91
 */
94
struct ast_taskprocessor_listener {
92
struct ast_taskprocessor_listener {
95
	/*! The callbacks the taskprocessor calls into to notify of state changes */
93
	/*! The callbacks the taskprocessor calls into to notify of state changes */
96
	const struct ast_taskprocessor_listener_callbacks *callbacks;
94
	const struct ast_taskprocessor_listener_callbacks *callbacks;
97
	/*! The taskprocessor that the listener is listening to */
95
	/*! The taskprocessor that the listener is listening to */
98
	struct ast_taskprocessor *tps;
96
	struct ast_taskprocessor *tps;
99
	/*! Data private to the listener */
97
	/*! Data private to the listener */
100
	void *user_data;
98
	void *user_data;
101
};
99
};
102

    
   
100

   
103
#define TPS_MAX_BUCKETS 7
101
#define TPS_MAX_BUCKETS 7
104
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
102
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
105
static struct ao2_container *tps_singletons;
103
static struct ao2_container *tps_singletons;
106

    
   
104

   
107
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
105
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
108
static ast_cond_t cli_ping_cond;
106
static ast_cond_t cli_ping_cond;
109

    
   
107

   
110
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
108
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
111
AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
109
AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
112

    
   
110

   
113
/*! \brief The astobj2 hash callback for taskprocessors */
111
/*! \brief The astobj2 hash callback for taskprocessors */
114
static int tps_hash_cb(const void *obj, const int flags);
112
static int tps_hash_cb(const void *obj, const int flags);
115
/*! \brief The astobj2 compare callback for taskprocessors */
113
/*! \brief The astobj2 compare callback for taskprocessors */
116
static int tps_cmp_cb(void *obj, void *arg, int flags);
114
static int tps_cmp_cb(void *obj, void *arg, int flags);
117

    
   
115

   
118
/*! \brief The task processing function executed by a taskprocessor */
116
/*! \brief The task processing function executed by a taskprocessor */
119
static void *tps_processing_function(void *data);
117
static void *tps_processing_function(void *data);
120

    
   
118

   
121
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
119
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
122
static void tps_taskprocessor_destroy(void *tps);
120
static void tps_taskprocessor_destroy(void *tps);
123

    
   
121

   
124
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
122
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
125
static int tps_ping_handler(void *datap);
123
static int tps_ping_handler(void *datap);
126

    
   
124

   
127
/*! \brief Remove the front task off the taskprocessor queue */
125
/*! \brief Remove the front task off the taskprocessor queue */
128
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
126
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
129

    
   
127

   
130
/*! \brief Return the size of the taskprocessor queue */
128
/*! \brief Return the size of the taskprocessor queue */
131
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
129
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
132

    
   
130

   
133
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
131
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
134
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
132
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
135

    
   
133

   
136
static struct ast_cli_entry taskprocessor_clis[] = {
134
static struct ast_cli_entry taskprocessor_clis[] = {
137
	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
135
	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
138
	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
136
	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
139
};
137
};
140

    
   
138

   
141
struct default_taskprocessor_listener_pvt {
139
struct default_taskprocessor_listener_pvt {
142
	pthread_t poll_thread;
140
	pthread_t poll_thread;
143
	ast_mutex_t lock;
141
	ast_mutex_t lock;
144
	ast_cond_t cond;
142
	ast_cond_t cond;
145
	int wake_up;
143
	int wake_up;
146
	int dead;
144
	int dead;
147
};
145
};
148

    
   
146

   
149

    
   
147

   
150
static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
148
static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
151
{
149
{
152
	SCOPED_MUTEX(lock, &pvt->lock);
150
	SCOPED_MUTEX(lock, &pvt->lock);
153
	pvt->wake_up = 1;
151
	pvt->wake_up = 1;
154
	pvt->dead = should_die;
152
	pvt->dead = should_die;
155
	ast_cond_signal(&pvt->cond);
153
	ast_cond_signal(&pvt->cond);
156
}
154
}
157

    
   
155

   
158
static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
156
static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
159
{
157
{
160
	SCOPED_MUTEX(lock, &pvt->lock);
158
	SCOPED_MUTEX(lock, &pvt->lock);
161
	while (!pvt->wake_up) {
159
	while (!pvt->wake_up) {
162
		ast_cond_wait(&pvt->cond, lock);
160
		ast_cond_wait(&pvt->cond, lock);
163
	}
161
	}
164
	pvt->wake_up = 0;
162
	pvt->wake_up = 0;
165
	return pvt->dead;
163
	return pvt->dead;
166
}
164
}
167

    
   
165

   
168
/*!
166
/*!
169
 * \brief Function that processes tasks in the taskprocessor
167
 * \brief Function that processes tasks in the taskprocessor
170
 * \internal
168
 * \internal
171
 */
169
 */
172
static void *tps_processing_function(void *data)
170
static void *tps_processing_function(void *data)
173
{
171
{
174
	struct ast_taskprocessor_listener *listener = data;
172
	struct ast_taskprocessor_listener *listener = data;
175
	struct ast_taskprocessor *tps = listener->tps;
173
	struct ast_taskprocessor *tps = listener->tps;
176
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
174
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
177
	int dead = 0;
175
	int dead = 0;
178

    
   
176

   
179
	while (!dead) {
177
	while (!dead) {
180
		if (!ast_taskprocessor_execute(tps)) {
178
		if (!ast_taskprocessor_execute(tps)) {
181
			dead = default_tps_idle(pvt);
179
			dead = default_tps_idle(pvt);
182
		}
180
		}
183
	}
181
	}
184
	return NULL;
182
	return NULL;
185
}
183
}
186

    
   
184

   
187
static int default_listener_start(struct ast_taskprocessor_listener *listener)
185
static int default_listener_start(struct ast_taskprocessor_listener *listener)
188
{
186
{
189
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
187
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
190

    
   
188

   
191
	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
189
	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
192
		return -1;
190
		return -1;
193
	}
191
	}
194

    
   
192

   
195
	return 0;
193
	return 0;
196
}
194
}
197

    
   
195

   
198
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
196
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
199
{
197
{
200
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
198
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
201

    
   
199

   

    
   
200
	ast_assert(!pvt->dead);

    
   
201

   
202
	if (was_empty) {
202
	if (was_empty) {
203
		default_tps_wake_up(pvt, 0);
203
		default_tps_wake_up(pvt, 0);
204
	}
204
	}
205
}
205
}
206

    
   
206

   
207
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
207
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
208
{
208
{
209
	ast_mutex_destroy(&pvt->lock);
209
	ast_mutex_destroy(&pvt->lock);
210
	ast_cond_destroy(&pvt->cond);
210
	ast_cond_destroy(&pvt->cond);
211
	ast_free(pvt);
211
	ast_free(pvt);
212
}
212
}
213

    
   
213

   
214
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
214
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
215
{
215
{
216
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
216
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
217
	default_tps_wake_up(pvt, 1);
217
	default_tps_wake_up(pvt, 1);
218
	pthread_join(pvt->poll_thread, NULL);
218
	pthread_join(pvt->poll_thread, NULL);
219
	pvt->poll_thread = AST_PTHREADT_NULL;
219
	pvt->poll_thread = AST_PTHREADT_NULL;
220
	default_listener_pvt_destroy(pvt);
220
	default_listener_pvt_destroy(pvt);
221
}
221
}
222

    
   
222

   
223
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
223
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
224
	.start = default_listener_start,
224
	.start = default_listener_start,
225
	.task_pushed = default_task_pushed,
225
	.task_pushed = default_task_pushed,
226
	.shutdown = default_listener_shutdown,
226
	.shutdown = default_listener_shutdown,
227
};
227
};
228

    
   
228

   
229
/*!
229
/*!
230
 * \internal
230
 * \internal
231
 * \brief Clean up resources on Asterisk shutdown
231
 * \brief Clean up resources on Asterisk shutdown
232
 */
232
 */
233
static void tps_shutdown(void)
233
static void tps_shutdown(void)
234
{
234
{
235
	ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
235
	ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
236
	ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
236
	ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
237
	tps_singletons = NULL;
237
	tps_singletons = NULL;
238
}
238
}
239

    
   
239

   
240
/* initialize the taskprocessor container and register CLI operations */
240
/* initialize the taskprocessor container and register CLI operations */
241
int ast_tps_init(void)
241
int ast_tps_init(void)
242
{
242
{
243
	if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
243
	if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
244
		ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
244
		ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
245
		return -1;
245
		return -1;
246
	}
246
	}
247

    
   
247

   
248
	ast_cond_init(&cli_ping_cond, NULL);
248
	ast_cond_init(&cli_ping_cond, NULL);
249

    
   
249

   
250
	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
250
	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
251

    
   
251

   
252
	ast_register_atexit(tps_shutdown);
252
	ast_register_atexit(tps_shutdown);
253

    
   
253

   
254
	return 0;
254
	return 0;
255
}
255
}
256

    
   
256

   
257
/* allocate resources for the task */
257
/* allocate resources for the task */
258
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
258
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
259
{
259
{
260
	struct tps_task *t;
260
	struct tps_task *t;
261
	if ((t = ast_calloc(1, sizeof(*t)))) {
261
	if ((t = ast_calloc(1, sizeof(*t)))) {
262
		t->execute = task_exe;
262
		t->execute = task_exe;
263
		t->datap = datap;
263
		t->datap = datap;
264
	}
264
	}
265
	return t;
265
	return t;
266
}
266
}
267

    
   
267

   
268
/* release task resources */
268
/* release task resources */
269
static void *tps_task_free(struct tps_task *task)
269
static void *tps_task_free(struct tps_task *task)
270
{
270
{
271
	if (task) {
271
	if (task) {
272
		ast_free(task);
272
		ast_free(task);
273
	}
273
	}
274
	return NULL;
274
	return NULL;
275
}
275
}
276

    
   
276

   
277
/* taskprocessor tab completion */
277
/* taskprocessor tab completion */
278
static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
278
static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
279
{
279
{
280
	int tklen;
280
	int tklen;
281
	int wordnum = 0;
281
	int wordnum = 0;
282
	char *name = NULL;
282
	char *name = NULL;
283
	struct ao2_iterator i;
283
	struct ao2_iterator i;
284

    
   
284

   
285
	if (a->pos != 3)
285
	if (a->pos != 3)
286
		return NULL;
286
		return NULL;
287

    
   
287

   
288
	tklen = strlen(a->word);
288
	tklen = strlen(a->word);
289
	i = ao2_iterator_init(tps_singletons, 0);
289
	i = ao2_iterator_init(tps_singletons, 0);
290
	while ((p = ao2_iterator_next(&i))) {
290
	while ((p = ao2_iterator_next(&i))) {
291
		if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
291
		if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
292
			name = ast_strdup(p->name);
292
			name = ast_strdup(p->name);
293
			ao2_ref(p, -1);
293
			ao2_ref(p, -1);
294
			break;
294
			break;
295
		}
295
		}
296
		ao2_ref(p, -1);
296
		ao2_ref(p, -1);
297
	}
297
	}
298
	ao2_iterator_destroy(&i);
298
	ao2_iterator_destroy(&i);
299
	return name;
299
	return name;
300
}
300
}
301

    
   
301

   
302
/* ping task handling function */
302
/* ping task handling function */
303
static int tps_ping_handler(void *datap)
303
static int tps_ping_handler(void *datap)
304
{
304
{
305
	ast_mutex_lock(&cli_ping_cond_lock);
305
	ast_mutex_lock(&cli_ping_cond_lock);
306
	ast_cond_signal(&cli_ping_cond);
306
	ast_cond_signal(&cli_ping_cond);
307
	ast_mutex_unlock(&cli_ping_cond_lock);
307
	ast_mutex_unlock(&cli_ping_cond_lock);
308
	return 0;
308
	return 0;
309
}
309
}
310

    
   
310

   
311
/* ping the specified taskprocessor and display the ping time on the CLI */
311
/* ping the specified taskprocessor and display the ping time on the CLI */
312
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
312
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
313
{
313
{
314
	struct timeval begin, end, delta;
314
	struct timeval begin, end, delta;
315
	const char *name;
315
	const char *name;
316
	struct timeval when;
316
	struct timeval when;
317
	struct timespec ts;
317
	struct timespec ts;
318
	struct ast_taskprocessor *tps = NULL;
318
	struct ast_taskprocessor *tps = NULL;
319

    
   
319

   
320
	switch (cmd) {
320
	switch (cmd) {
321
	case CLI_INIT:
321
	case CLI_INIT:
322
		e->command = "core ping taskprocessor";
322
		e->command = "core ping taskprocessor";
323
		e->usage =
323
		e->usage =
324
			"Usage: core ping taskprocessor <taskprocessor>\n"
324
			"Usage: core ping taskprocessor <taskprocessor>\n"
325
			"	Displays the time required for a task to be processed\n";
325
			"	Displays the time required for a task to be processed\n";
326
		return NULL;
326
		return NULL;
327
	case CLI_GENERATE:
327
	case CLI_GENERATE:
328
		return tps_taskprocessor_tab_complete(tps, a);
328
		return tps_taskprocessor_tab_complete(tps, a);
329
	}
329
	}
330

    
   
330

   
331
	if (a->argc != 4)
331
	if (a->argc != 4)
332
		return CLI_SHOWUSAGE;
332
		return CLI_SHOWUSAGE;
333

    
   
333

   
334
	name = a->argv[3];
334
	name = a->argv[3];
335
	if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
335
	if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
336
		ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
336
		ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
337
		return CLI_SUCCESS;
337
		return CLI_SUCCESS;
338
	}
338
	}
339
	ast_cli(a->fd, "\npinging %s ...", name);
339
	ast_cli(a->fd, "\npinging %s ...", name);
340
	when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
340
	when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
341
	ts.tv_sec = when.tv_sec;
341
	ts.tv_sec = when.tv_sec;
342
	ts.tv_nsec = when.tv_usec * 1000;
342
	ts.tv_nsec = when.tv_usec * 1000;
343
	ast_mutex_lock(&cli_ping_cond_lock);
343
	ast_mutex_lock(&cli_ping_cond_lock);
344
	if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
344
	if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
345
		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
345
		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
346
		ao2_ref(tps, -1);
346
		ao2_ref(tps, -1);
347
		return CLI_FAILURE;
347
		return CLI_FAILURE;
348
	}
348
	}
349
	ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
349
	ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
350
	ast_mutex_unlock(&cli_ping_cond_lock);
350
	ast_mutex_unlock(&cli_ping_cond_lock);
351
	end = ast_tvnow();
351
	end = ast_tvnow();
352
	delta = ast_tvsub(end, begin);
352
	delta = ast_tvsub(end, begin);
353
	ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
353
	ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
354
	ao2_ref(tps, -1);
354
	ao2_ref(tps, -1);
355
	return CLI_SUCCESS;
355
	return CLI_SUCCESS;
356
}
356
}
357

    
   
357

   
358
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
358
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
359
{
359
{
360
	char name[256];
360
	char name[256];
361
	int tcount;
361
	int tcount;
362
	unsigned long qsize;
362
	unsigned long qsize;
363
	unsigned long maxqsize;
363
	unsigned long maxqsize;
364
	unsigned long processed;
364
	unsigned long processed;
365
	struct ast_taskprocessor *p;
365
	struct ast_taskprocessor *p;
366
	struct ao2_iterator i;
366
	struct ao2_iterator i;
367

    
   
367

   
368
	switch (cmd) {
368
	switch (cmd) {
369
	case CLI_INIT:
369
	case CLI_INIT:
370
		e->command = "core show taskprocessors";
370
		e->command = "core show taskprocessors";
371
		e->usage =
371
		e->usage =
372
			"Usage: core show taskprocessors\n"
372
			"Usage: core show taskprocessors\n"
373
			"	Shows a list of instantiated task processors and their statistics\n";
373
			"	Shows a list of instantiated task processors and their statistics\n";
374
		return NULL;
374
		return NULL;
375
	case CLI_GENERATE:
375
	case CLI_GENERATE:
376
		return NULL;
376
		return NULL;
377
	}
377
	}
378

    
   
378

   
379
	if (a->argc != e->args)
379
	if (a->argc != e->args)
380
		return CLI_SHOWUSAGE;
380
		return CLI_SHOWUSAGE;
381

    
   
381

   
382
	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
382
	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
383
	i = ao2_iterator_init(tps_singletons, 0);
383
	i = ao2_iterator_init(tps_singletons, 0);
384
	while ((p = ao2_iterator_next(&i))) {
384
	while ((p = ao2_iterator_next(&i))) {
385
		ast_copy_string(name, p->name, sizeof(name));
385
		ast_copy_string(name, p->name, sizeof(name));
386
		qsize = p->tps_queue_size;
386
		qsize = p->tps_queue_size;
387
		maxqsize = p->stats->max_qsize;
387
		maxqsize = p->stats->max_qsize;
388
		processed = p->stats->_tasks_processed_count;
388
		processed = p->stats->_tasks_processed_count;
389
		ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
389
		ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
390
		ao2_ref(p, -1);
390
		ao2_ref(p, -1);
391
	}
391
	}
392
	ao2_iterator_destroy(&i);
392
	ao2_iterator_destroy(&i);
393
	tcount = ao2_container_count(tps_singletons);
393
	tcount = ao2_container_count(tps_singletons);
394
	ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
394
	ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
395
	return CLI_SUCCESS;
395
	return CLI_SUCCESS;
396
}
396
}
397

    
   
397

   
398
/* hash callback for astobj2 */
398
/* hash callback for astobj2 */
399
static int tps_hash_cb(const void *obj, const int flags)
399
static int tps_hash_cb(const void *obj, const int flags)
400
{
400
{
401
	const struct ast_taskprocessor *tps = obj;
401
	const struct ast_taskprocessor *tps = obj;
402
	const char *name = flags & OBJ_KEY ? obj : tps->name;
402
	const char *name = flags & OBJ_KEY ? obj : tps->name;
403

    
   
403

   
404
	return ast_str_case_hash(name);
404
	return ast_str_case_hash(name);
405
}
405
}
406

    
   
406

   
407
/* compare callback for astobj2 */
407
/* compare callback for astobj2 */
408
static int tps_cmp_cb(void *obj, void *arg, int flags)
408
static int tps_cmp_cb(void *obj, void *arg, int flags)
409
{
409
{
410
	struct ast_taskprocessor *lhs = obj, *rhs = arg;
410
	struct ast_taskprocessor *lhs = obj, *rhs = arg;
411
	const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
411
	const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
412

    
   
412

   
413
	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
413
	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
414
}
414
}
415

    
   
415

   
416
/* destroy the taskprocessor */
416
/* destroy the taskprocessor */
417
static void tps_taskprocessor_destroy(void *tps)
417
static void tps_taskprocessor_destroy(void *tps)
418
{
418
{
419
	struct ast_taskprocessor *t = tps;
419
	struct ast_taskprocessor *t = tps;
420
	struct tps_task *task;
420
	struct tps_task *task;
421

    
   
421

   
422
	if (!tps) {
422
	if (!tps) {
423
		ast_log(LOG_ERROR, "missing taskprocessor\n");
423
		ast_log(LOG_ERROR, "missing taskprocessor\n");
424
		return;
424
		return;
425
	}
425
	}
426
	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
426
	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
427
	/* free it */
427
	/* free it */
428
	if (t->stats) {
428
	if (t->stats) {
429
		ast_free(t->stats);
429
		ast_free(t->stats);
430
		t->stats = NULL;
430
		t->stats = NULL;
431
	}
431
	}
432
	ast_free((char *) t->name);
432
	ast_free((char *) t->name);
433
	if (t->listener) {
433
	if (t->listener) {
434
		/* This code should not be reached since the listener
434
		/* This code should not be reached since the listener
435
		 * should have been destroyed before the taskprocessor could
435
		 * should have been destroyed before the taskprocessor could
436
		 * be destroyed
436
		 * be destroyed
437
		 */
437
		 */
438
		ao2_ref(t->listener, -1);
438
		ao2_ref(t->listener, -1);
439
		t->listener = NULL;
439
		t->listener = NULL;
440
	}
440
	}
441
	while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
441
	while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
442
		tps_task_free(task);
442
		tps_task_free(task);
443
	}
443
	}
444
}
444
}
445

    
   
445

   
446
/* pop the front task and return it */
446
/* pop the front task and return it */
447
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
447
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
448
{
448
{
449
	struct tps_task *task;
449
	struct tps_task *task;
450
	SCOPED_AO2LOCK(lock, tps);
450
	SCOPED_AO2LOCK(lock, tps);
451

    
   
451

   
452
	if (tps->shutting_down) {

   
453
		return NULL;

   
454
	}

   
455

    
   

   
456
	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
452
	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
457
		tps->tps_queue_size--;
453
		tps->tps_queue_size--;
458
	}
454
	}
459
	return task;
455
	return task;
460
}
456
}
461

    
   
457

   
462
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
458
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
463
{
459
{
464
	return (tps) ? tps->tps_queue_size : -1;
460
	return (tps) ? tps->tps_queue_size : -1;
465
}
461
}
466

    
   
462

   
467
/* taskprocessor name accessor */
463
/* taskprocessor name accessor */
468
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
464
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
469
{
465
{
470
	if (!tps) {
466
	if (!tps) {
471
		ast_log(LOG_ERROR, "no taskprocessor specified!\n");
467
		ast_log(LOG_ERROR, "no taskprocessor specified!\n");
472
		return NULL;
468
		return NULL;
473
	}
469
	}
474
	return tps->name;
470
	return tps->name;
475
}
471
}
476

    
   
472

   
477
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
473
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
478
{
474
{
479
	listener->callbacks->shutdown(listener);
475
	listener->callbacks->shutdown(listener);
480
	ao2_ref(listener->tps, -1);
476
	ao2_ref(listener->tps, -1);
481
}
477
}
482

    
   
478

   
483
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
479
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
484
{
480
{
485
	RAII_VAR(struct ast_taskprocessor_listener *, listener,
481
	RAII_VAR(struct ast_taskprocessor_listener *, listener,
486
			ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
482
			ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
487

    
   
483

   
488
	if (!listener) {
484
	if (!listener) {
489
		return NULL;
485
		return NULL;
490
	}
486
	}
491
	listener->callbacks = callbacks;
487
	listener->callbacks = callbacks;
492
	listener->user_data = user_data;
488
	listener->user_data = user_data;
493

    
   
489

   
494
	ao2_ref(listener, +1);
490
	ao2_ref(listener, +1);
495
	return listener;
491
	return listener;
496
}
492
}
497

    
   
493

   
498
struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
494
struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
499
{
495
{
500
	ao2_ref(listener->tps, +1);
496
	ao2_ref(listener->tps, +1);
501
	return listener->tps;
497
	return listener->tps;
502
}
498
}
503

    
   
499

   
504
void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
500
void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
505
{
501
{
506
	return listener->user_data;
502
	return listener->user_data;
507
}
503
}
508

    
   
504

   
509
static void *default_listener_pvt_alloc(void)
505
static void *default_listener_pvt_alloc(void)
510
{
506
{
511
	struct default_taskprocessor_listener_pvt *pvt;
507
	struct default_taskprocessor_listener_pvt *pvt;
512

    
   
508

   
513
	pvt = ast_calloc(1, sizeof(*pvt));
509
	pvt = ast_calloc(1, sizeof(*pvt));
514
	if (!pvt) {
510
	if (!pvt) {
515
		return NULL;
511
		return NULL;
516
	}
512
	}
517
	ast_cond_init(&pvt->cond, NULL);
513
	ast_cond_init(&pvt->cond, NULL);
518
	ast_mutex_init(&pvt->lock);
514
	ast_mutex_init(&pvt->lock);
519
	pvt->poll_thread = AST_PTHREADT_NULL;
515
	pvt->poll_thread = AST_PTHREADT_NULL;
520
	return pvt;
516
	return pvt;
521
}
517
}
522

    
   
518

   
523
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
519
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
524
{
520
{
525
	RAII_VAR(struct ast_taskprocessor *, p,
521
	RAII_VAR(struct ast_taskprocessor *, p,
526
			ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
522
			ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
527

    
   
523

   
528
	if (!p) {
524
	if (!p) {
529
		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
525
		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
530
		return NULL;
526
		return NULL;
531
	}
527
	}
532

    
   
528

   
533
	if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
529
	if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
534
		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
530
		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
535
		return NULL;
531
		return NULL;
536
	}
532
	}
537
	if (!(p->name = ast_strdup(name))) {
533
	if (!(p->name = ast_strdup(name))) {
538
		ao2_ref(p, -1);
534
		ao2_ref(p, -1);
539
		return NULL;
535
		return NULL;
540
	}
536
	}
541

    
   
537

   
542
	ao2_ref(listener, +1);
538
	ao2_ref(listener, +1);
543
	p->listener = listener;
539
	p->listener = listener;
544

    
   
540

   
545
	ao2_ref(p, +1);
541
	ao2_ref(p, +1);
546
	listener->tps = p;
542
	listener->tps = p;
547

    
   
543

   
548
	if (!(ao2_link(tps_singletons, p))) {
544
	if (!(ao2_link(tps_singletons, p))) {
549
		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
545
		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
550
		return NULL;
546
		return NULL;
551
	}
547
	}
552

    
   
548

   
553
	if (p->listener->callbacks->start(p->listener)) {
549
	if (p->listener->callbacks->start(p->listener)) {
554
		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
550
		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
555
		ast_taskprocessor_unreference(p);
551
		ast_taskprocessor_unreference(p);
556
		return NULL;
552
		return NULL;
557
	}
553
	}
558

    
   
554

   
559
	/* RAII_VAR will decrement the refcount at the end of the function.
555
	/* RAII_VAR will decrement the refcount at the end of the function.
560
	 * Since we want to pass back a reference to p, we bump the refcount
556
	 * Since we want to pass back a reference to p, we bump the refcount
561
	 */
557
	 */
562
	ao2_ref(p, +1);
558
	ao2_ref(p, +1);
563
	return p;
559
	return p;
564

    
   
560

   
565
}
561
}
566

    
   
562

   
567
/* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
563
/* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
568
 * create the taskprocessor if we were told via ast_tps_options to return a reference only
564
 * create the taskprocessor if we were told via ast_tps_options to return a reference only
569
 * if it already exists */
565
 * if it already exists */
570
struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
566
struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
571
{
567
{
572
	struct ast_taskprocessor *p;
568
	struct ast_taskprocessor *p;
573
	struct ast_taskprocessor_listener *listener;
569
	struct ast_taskprocessor_listener *listener;
574
	struct default_taskprocessor_listener_pvt *pvt;
570
	struct default_taskprocessor_listener_pvt *pvt;
575

    
   
571

   
576
	if (ast_strlen_zero(name)) {
572
	if (ast_strlen_zero(name)) {
577
		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
573
		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
578
		return NULL;
574
		return NULL;
579
	}
575
	}
580
	p = ao2_find(tps_singletons, name, OBJ_KEY);
576
	p = ao2_find(tps_singletons, name, OBJ_KEY);
581
	if (p) {
577
	if (p) {
582
		return p;
578
		return p;
583
	}
579
	}
584
	if (create & TPS_REF_IF_EXISTS) {
580
	if (create & TPS_REF_IF_EXISTS) {
585
		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
581
		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
586
		return NULL;
582
		return NULL;
587
	}
583
	}
588
	/* Create a new taskprocessor. Start by creating a default listener */
584
	/* Create a new taskprocessor. Start by creating a default listener */
589
	pvt = default_listener_pvt_alloc();
585
	pvt = default_listener_pvt_alloc();
590
	if (!pvt) {
586
	if (!pvt) {
591
		return NULL;
587
		return NULL;
592
	}
588
	}
593
	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
589
	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
594
	if (!listener) {
590
	if (!listener) {
595
		default_listener_pvt_destroy(pvt);
591
		default_listener_pvt_destroy(pvt);
596
		return NULL;
592
		return NULL;
597
	}
593
	}
598

    
   
594

   
599
	p = __allocate_taskprocessor(name, listener);
595
	p = __allocate_taskprocessor(name, listener);
600
	if (!p) {
596
	if (!p) {
601
		default_listener_pvt_destroy(pvt);
597
		default_listener_pvt_destroy(pvt);
602
		ao2_ref(listener, -1);
598
		ao2_ref(listener, -1);
603
		return NULL;
599
		return NULL;
604
	}
600
	}
605

    
   
601

   
606
	/* Unref listener here since the taskprocessor has gained a reference to the listener */
602
	/* Unref listener here since the taskprocessor has gained a reference to the listener */
607
	ao2_ref(listener, -1);
603
	ao2_ref(listener, -1);
608
	return p;
604
	return p;
609

    
   
605

   
610
}
606
}
611

    
   
607

   
612
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
608
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
613
{
609
{
614
	struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
610
	struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
615

    
   
611

   
616
	if (p) {
612
	if (p) {
617
		ast_taskprocessor_unreference(p);
613
		ast_taskprocessor_unreference(p);
618
		return NULL;
614
		return NULL;
619
	}
615
	}
620
	return __allocate_taskprocessor(name, listener);
616
	return __allocate_taskprocessor(name, listener);
621
}
617
}
622

    
   
618

   
623
/* decrement the taskprocessor reference count and unlink from the container if necessary */
619
/* decrement the taskprocessor reference count and unlink from the container if necessary */
624
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
620
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
625
{
621
{
626
	if (!tps) {
622
	if (!tps) {
627
		return NULL;
623
		return NULL;
628
	}
624
	}
629

    
   
625

   
630
	if (ao2_ref(tps, -1) > 3) {
626
	if (ao2_ref(tps, -1) > 3) {
631
		return NULL;
627
		return NULL;
632
	}
628
	}
633
	/* If we're down to 3 references, then those must be:
629
	/* If we're down to 3 references, then those must be:
634
	 * 1. The reference we just got rid of
630
	 * 1. The reference we just got rid of
635
	 * 2. The container
631
	 * 2. The container
636
	 * 3. The listener
632
	 * 3. The listener
637
	 */
633
	 */
638
	ao2_unlink(tps_singletons, tps);
634
	ao2_unlink(tps_singletons, tps);
639
	listener_shutdown(tps->listener);
635
	listener_shutdown(tps->listener);
640
	return NULL;
636
	return NULL;
641
}
637
}
642

    
   
638

   
643
/* push the task into the taskprocessor queue */
639
/* push the task into the taskprocessor queue */
644
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
640
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
645
{
641
{
646
	struct tps_task *t;
642
	struct tps_task *t;
647
	int previous_size;
643
	int previous_size;
648
	int was_empty;
644
	int was_empty;
649

    
   
645

   
650
	if (!tps || !task_exe) {
646
	if (!tps || !task_exe) {
651
		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
647
		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
652
		return -1;
648
		return -1;
653
	}
649
	}
654
	if (!(t = tps_task_alloc(task_exe, datap))) {
650
	if (!(t = tps_task_alloc(task_exe, datap))) {
655
		ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
651
		ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
656
		return -1;
652
		return -1;
657
	}
653
	}
658
	ao2_lock(tps);
654
	ao2_lock(tps);
659
	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
655
	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
660
	previous_size = tps->tps_queue_size++;
656
	previous_size = tps->tps_queue_size++;
661
	/* The currently executing task counts as still in queue */
657
	/* The currently executing task counts as still in queue */
662
	was_empty = tps->executing ? 0 : previous_size == 0;
658
	was_empty = tps->executing ? 0 : previous_size == 0;
663
	ao2_unlock(tps);
659
	ao2_unlock(tps);
664
	tps->listener->callbacks->task_pushed(tps->listener, was_empty);
660
	tps->listener->callbacks->task_pushed(tps->listener, was_empty);
665
	return 0;
661
	return 0;
666
}
662
}
667

    
   
663

   
668
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
664
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
669
{
665
{
670
	struct tps_task *t;
666
	struct tps_task *t;
671
	int size;
667
	int size;
672

    
   
668

   
673
	ao2_lock(tps);
669
	ao2_lock(tps);
674
	tps->executing = 1;
670
	tps->executing = 1;
675
	ao2_unlock(tps);
671
	ao2_unlock(tps);
676

    
   
672

   
677
	if (!(t = tps_taskprocessor_pop(tps))) {
673
	t = tps_taskprocessor_pop(tps);
678
		ao2_lock(tps);

   
679
		/* We need to check size in the same critical section where we

   
680
		 * reset the executing bit. Avoids a race condition where a task

   
681
		 * is pushed right after we pop an empty stack.

   
682
		 */

   
683
		tps->executing = 0;

   
684
		size = tps_taskprocessor_depth(tps);

   
685
		ao2_unlock(tps);

   
686
		return size > 0;

   
687
	}

   
688

    
   
674

   

    
   
675
	if (t) {
689
	t->execute(t->datap);
676
		t->execute(t->datap);
690

    
   

   
691
	tps_task_free(t);
677
		tps_task_free(t);

    
   
678
	}
692

    
   
679

   
693
	ao2_lock(tps);
680
	ao2_lock(tps);

    
   
681
	/* We need to check size in the same critical section where we reset the

    
   
682
	 * executing bit. Avoids a race condition where a task is pushed right

    
   
683
	 * after we pop an empty stack.

    
   
684
	 */
694
	tps->executing = 0;
685
	tps->executing = 0;
695
	size = tps_taskprocessor_depth(tps);
686
	size = tps_taskprocessor_depth(tps);
696
	if (tps->stats) {
687
	/* If we executed a task, bump the stats */

    
   
688
	if (t && tps->stats) {
697
		tps->stats->_tasks_processed_count++;
689
		tps->stats->_tasks_processed_count++;
698
		if (size > tps->stats->max_qsize) {
690
		if (size > tps->stats->max_qsize) {
699
			tps->stats->max_qsize = size;
691
			tps->stats->max_qsize = size;
700
		}
692
		}
701
	}
693
	}
702
	ao2_unlock(tps);
694
	ao2_unlock(tps);
703

    
   
695

   
704
	if (size == 0 && tps->listener->callbacks->emptied) {
696
	/* If we executed a task, check for the transition to empty */

    
   
697
	if (t && size == 0 && tps->listener->callbacks->emptied) {
705
		tps->listener->callbacks->emptied(tps->listener);
698
		tps->listener->callbacks->emptied(tps->listener);
706
		return 0;

   
707
	}
699
	}
708
	return 1;
700
	return size > 0;
709
}
701
}
/trunk/tests/test_taskprocessor.c
Diff Revision 1 Diff Revision 2
 
/trunk/tests/test_threadpool.c
Diff Revision 1 Diff Revision 2
 
  1. /trunk/main/taskprocessor.c: Loading...
  2. /trunk/tests/test_taskprocessor.c: Loading...
  3. /trunk/tests/test_threadpool.c: Loading...

https://reviewboard.asterisk.org/ runs on a server provided by Digium, Inc. and uses bandwidth donated to the open source Asterisk community by API Digital Communications in Huntsville, AL USA.
Please report problems with this site to asteriskteam@digium.com.