parallel_foreach.hpp 19.2 KB
Newer Older
Sylvain Thery's avatar
Sylvain Thery committed
1 2 3
/*******************************************************************************
* CGoGN: Combinatorial and Geometric modeling with Generic N-dimensional Maps  *
* version 0.1                                                                  *
4
* Copyright (C) 2009-2012, IGG Team, LSIIT, University of Strasbourg           *
Sylvain Thery's avatar
Sylvain Thery committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
*                                                                              *
* This library is free software; you can redistribute it and/or modify it      *
* under the terms of the GNU Lesser General Public License as published by the *
* Free Software Foundation; either version 2.1 of the License, or (at your     *
* option) any later version.                                                   *
*                                                                              *
* This library is distributed in the hope that it will be useful, but WITHOUT  *
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or        *
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License  *
* for more details.                                                            *
*                                                                              *
* You should have received a copy of the GNU Lesser General Public License     *
* along with this library; if not, write to the Free Software Foundation,      *
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.           *
*                                                                              *
20
* Web site: http://cgogn.unistra.fr/                                           *
Sylvain Thery's avatar
Sylvain Thery committed
21 22 23 24 25
* Contact information: cgogn@unistra.fr                                        *
*                                                                              *
*******************************************************************************/
#include <boost/thread.hpp>
#include <boost/thread/barrier.hpp>
sylvain thery's avatar
sylvain thery committed
26
#include <vector>
Sylvain Thery's avatar
Sylvain Thery committed
27 28 29 30 31 32 33 34 35 36 37

namespace CGoGN
{

namespace Algo
{

namespace Parallel
{


38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
/**
 *
 */
template<typename MAP>
class ThreadFunction
{
protected:
	std::vector<Dart>& m_darts;
	boost::barrier& m_sync1;
	boost::barrier& m_sync2;
	bool& m_finished;
	FunctorMapThreaded<MAP>* m_functor;
public:
	ThreadFunction(FunctorMapThreaded<MAP>& func, std::vector<Dart>& vd, boost::barrier& s1, boost::barrier& s2, bool& finished, unsigned int id):
		m_darts(vd), m_sync1(s1), m_sync2(s2), m_finished(finished)
	{
		m_functor = func.duplicate(id);
	}

	ThreadFunction(const ThreadFunction<MAP>& tf):
		m_darts(tf.m_darts), m_sync1(tf.m_sync1), m_sync2(tf.m_sync2), m_finished(tf.m_finished), m_functor(tf.m_functor){}

	void operator()()
	{
		while (!m_finished)
		{
			for (std::vector<Dart>::const_iterator it = m_darts.begin(); it != m_darts.end(); ++it)
				m_functor->operator()(*it);
			m_sync1.wait();
			m_sync2.wait();
		}
	}

	void clean()
	{
		delete m_functor;
	}
Sylvain Thery's avatar
Sylvain Thery committed
75

76
};
Sylvain Thery's avatar
Sylvain Thery committed
77

78 79
template<typename MAP, typename T>
class ThreadFunctionResult
Sylvain Thery's avatar
Sylvain Thery committed
80 81
{
protected:
82
	std::vector<Dart>& m_darts;
Sylvain Thery's avatar
Sylvain Thery committed
83 84 85
	boost::barrier& m_sync1;
	boost::barrier& m_sync2;
	bool& m_finished;
86 87
	FunctorMapThreadedResult<MAP,T>* m_functor;
	T& m_result;
Sylvain Thery's avatar
Sylvain Thery committed
88
public:
89 90 91 92 93 94 95 96 97 98 99 100 101
	ThreadFunctionResult(FunctorMapThreadedResult<MAP,T>& func, std::vector<Dart>& vd, boost::barrier& s1, boost::barrier& s2, bool& finished, unsigned int id, T& result):
		m_darts(vd), m_sync1(s1), m_sync2(s2), m_finished(finished), m_result(result)
	{
		m_functor = reinterpret_cast< FunctorMapThreadedResult<MAP,T>* >(func.duplicate(id));
	}

	ThreadFunctionResult(const ThreadFunctionResult<MAP,T>& tf):
		m_darts(tf.m_darts),
		m_sync1(tf.m_sync1),
		m_sync2(tf.m_sync2),
		m_finished(tf.m_finished),
		m_functor(tf.m_functor),
		m_result(tf.m_result){}
Sylvain Thery's avatar
Sylvain Thery committed
102 103 104

	void operator()()
	{
105

Sylvain Thery's avatar
Sylvain Thery committed
106 107 108
		while (!m_finished)
		{
			for (std::vector<Dart>::const_iterator it = m_darts.begin(); it != m_darts.end(); ++it)
109
				m_functor->operator()(*it);
Sylvain Thery's avatar
Sylvain Thery committed
110 111 112
			m_sync1.wait();
			m_sync2.wait();
		}
113 114 115 116 117 118
		m_result = this->m_functor->getResult();
	}

	void clean()
	{
		delete m_functor;
Sylvain Thery's avatar
Sylvain Thery committed
119 120 121
	}
};

122 123


124 125
template <typename PFP, unsigned int ORBIT>
void foreach_orbit(typename PFP::MAP& map, FunctorMapThreaded<typename PFP::MAP>& func, unsigned int nbth, unsigned int szbuff, bool needMarkers, const FunctorSelect& good)
Sylvain Thery's avatar
Sylvain Thery committed
126
{
127 128
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
Sylvain Thery's avatar
Sylvain Thery committed
129 130

	DartMarker dm(map);
131
	Dart d = map.begin();
Sylvain Thery's avatar
Sylvain Thery committed
132 133 134 135 136 137

	// nbth new functions, new thread (with good darts !)
	for (unsigned int i=0; i<nbth; ++i)
		vd[i].reserve(szbuff);

	// fill each vd buffers with 4096 darts
138 139
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
140 141 142
	{
		if (good(d) && (!dm.isMarked(d)))
		{
143
			dm.markOrbit<ORBIT>(d);
Sylvain Thery's avatar
Sylvain Thery committed
144 145 146 147 148 149
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

150 151 152 153 154 155 156 157
	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

Sylvain Thery's avatar
Sylvain Thery committed
158 159
	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
160
	bool finished = false;
Sylvain Thery's avatar
Sylvain Thery committed
161
	// lauch threads
162 163
	if (needMarkers)
	{
164
		for (unsigned int i = 0; i < nbth; ++i)
165 166 167 168
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,1+i));
	}
	else
	{
169
		for (unsigned int i = 0; i < nbth; ++i)
170 171
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,0));
	}
Sylvain Thery's avatar
Sylvain Thery committed
172 173

	// and continue to traverse the map
174 175
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

176
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
177 178
		tempo[i].reserve(szbuff);

179
	while (d != map.end())
Sylvain Thery's avatar
Sylvain Thery committed
180
	{
181
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
182 183
			tempo[i].clear();

184 185
		unsigned int nb = 0;
		while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
186 187 188
		{
			if (good(d) && (!dm.isMarked(d)))
			{
189
				dm.markOrbit<ORBIT>(d);
Sylvain Thery's avatar
Sylvain Thery committed
190 191 192 193 194 195
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

196
		// sync and swap the two vectors
Sylvain Thery's avatar
Sylvain Thery committed
197
		sync1.wait();
198
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
199 200 201 202 203 204 205 206 207
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	//wait for all theads to be finished
208
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
209 210 211 212
	{
		threads[i]->join();
		delete threads[i];
	}
213 214 215 216 217

	// and release memory
	delete[] threads;
	delete[] vd;
	delete[] tempo;
Sylvain Thery's avatar
Sylvain Thery committed
218 219 220 221 222 223 224 225 226 227 228
}

/**
 * Traverse cells of a map in parallel. Use embedding marker
 * Functor application must be independant
 * @param map the map
 * @param func the functor to apply
 * @param nbth number of thread to use (use twice as threads of processor)
 * @param szbuff size of buffers to store darts in each thread (default is 8192, use less for lower memory consumsion)
 * @param good a selector
 */
229 230
template <typename PFP, unsigned int CELL>
void foreach_cell(typename PFP::MAP& map, FunctorMapThreaded<typename PFP::MAP>& func, unsigned int nbth, unsigned int szbuff, bool needMarkers, const FunctorSelect& good)
Sylvain Thery's avatar
Sylvain Thery committed
231
{
sylvain thery's avatar
sylvain thery committed
232 233
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
Sylvain Thery's avatar
Sylvain Thery committed
234

235 236
	CellMarker<CELL> cm(map);
	Dart d = map.begin();
Sylvain Thery's avatar
Sylvain Thery committed
237 238

	// nbth new functions, new thread (with good darts !)
239
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
240 241 242
		vd[i].reserve(szbuff);

	// fill each vd buffers with 4096 darts
243 244
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
245 246 247 248 249 250 251 252 253 254
	{
		if (good(d) && (!cm.isMarked(d)))
		{
			cm.mark(d);
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

255 256 257 258 259 260 261 262
	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

Sylvain Thery's avatar
Sylvain Thery committed
263 264 265 266
	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
	bool finished=false;
	// lauch threads
267 268
	if (needMarkers)
	{
269
		for (unsigned int i = 0; i < nbth; ++i)
270 271 272 273
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,1+i));
	}
	else
	{
274
		for (unsigned int i = 0; i < nbth; ++i)
275 276
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,0));
	}
Sylvain Thery's avatar
Sylvain Thery committed
277
	// and continue to traverse the map
sylvain thery's avatar
sylvain thery committed
278 279
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

280
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
281 282
		tempo[i].reserve(szbuff);

283
	while (d != map.end())
Sylvain Thery's avatar
Sylvain Thery committed
284
	{
285
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
286 287
			tempo[i].clear();

288 289
		unsigned int nb = 0;
		while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
290 291 292 293 294 295 296 297 298 299 300
		{
			if (good(d) && (!cm.isMarked(d)))
			{
				cm.mark(d);
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		sync1.wait();
301
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
302 303 304 305 306 307 308 309 310
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	//wait for all theads to be finished
311
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
312 313 314 315
	{
		threads[i]->join();
		delete threads[i];
	}
sylvain thery's avatar
sylvain thery committed
316 317 318
	delete[] threads;
	delete[] vd;
	delete[] tempo;
Sylvain Thery's avatar
Sylvain Thery committed
319 320
}

Sylvain Thery's avatar
Sylvain Thery committed
321
template <typename PFP>
322
void foreach_dart(typename PFP::MAP& map, FunctorMapThreaded<typename PFP::MAP>& func, unsigned int nbth, unsigned int szbuff, bool needMarkers, const FunctorSelect& good)
Sylvain Thery's avatar
Sylvain Thery committed
323
{
324 325
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
Sylvain Thery's avatar
Sylvain Thery committed
326

327
	Dart d = map.begin();
Sylvain Thery's avatar
Sylvain Thery committed
328 329

	// nbth new functions, new thread (with good darts !)
330
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
331 332 333
		vd[i].reserve(szbuff);

	// fill each vd buffers with 4096 darts
334 335
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
336 337 338 339 340 341 342 343 344
	{
		if (good(d))
		{
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

345 346 347 348 349 350 351 352
	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

Sylvain Thery's avatar
Sylvain Thery committed
353 354
	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
355
	bool finished = false;
Sylvain Thery's avatar
Sylvain Thery committed
356
	// lauch threads
357 358
	if (needMarkers)
	{
359
		for (unsigned int i = 0; i < nbth; ++i)
360 361 362 363
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,1+i));
	}
	else
	{
364
		for (unsigned int i = 0; i < nbth; ++i)
365 366
			threads[i] = new boost::thread(ThreadFunction<typename PFP::MAP>(func, vd[i],sync1,sync2, finished,0));
	}
Sylvain Thery's avatar
Sylvain Thery committed
367
	// and continue to traverse the map
368
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];
369
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
370 371
		tempo[i].reserve(szbuff);

372
	while (d != map.end())
Sylvain Thery's avatar
Sylvain Thery committed
373
	{
374
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
375 376 377
			tempo[i].clear();

		unsigned int nb =0;
378
		while ((d != map.end()) && (nb < nbth*szbuff) )
Sylvain Thery's avatar
Sylvain Thery committed
379 380 381 382 383 384 385 386 387 388
		{
			if (good(d))
			{
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		sync1.wait();
389
		for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
390 391 392 393 394 395 396 397 398
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	//wait for all theads to be finished
399
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
400 401 402 403
	{
		threads[i]->join();
		delete threads[i];
	}
404 405 406 407
	
	delete vd;
	delete threads;
	delete tempo;
Sylvain Thery's avatar
Sylvain Thery committed
408 409
}

410 411
template <typename PFP, unsigned int ORBIT, typename T>
void foreach_orbit_res(typename PFP::MAP& map, FunctorMapThreadedResult<typename PFP::MAP, T>& func, unsigned int nbth, unsigned int szbuff, std::vector<T>& results, bool needMarkers, const FunctorSelect& good)
412 413 414
{
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
415
	ThreadFunctionResult<typename PFP::MAP,T>** th_funcs= new ThreadFunctionResult<typename PFP::MAP,T>*[nbth];
416 417

	DartMarker dm(map);
418
	Dart d = map.begin();
419 420

	// nbth new functions, new thread (with good darts !)
421
	for (unsigned int i = 0; i < nbth; ++i)
422 423 424
		vd[i].reserve(szbuff);

	// fill each vd buffers with 4096 darts
425 426
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
427 428 429
	{
		if (good(d) && (!dm.isMarked(d)))
		{
430
			dm.markOrbit<ORBIT>(d);
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

	// prepare some space pour results
	results.resize(nbth);

	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
	bool finished=false;
	// lauch threads
	if (needMarkers)
	{
454
		for (unsigned int i = 0; i < nbth; ++i)
455 456 457 458 459 460 461 462
		{
			// here dynamic allocation to allow the freeing of m_functor (clean) at the end
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,1+i,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}
	else
	{
463
		for (unsigned int i = 0; i < nbth; ++i)
464 465 466 467 468 469 470 471 472
		{
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,0,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}

	// and continue to traverse the map
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

473
	for (unsigned int i = 0; i < nbth; ++i)
474 475
		tempo[i].reserve(szbuff);

476
	while (d != map.end())
477
	{
478
		for (unsigned int i = 0; i < nbth; ++i)
479 480
			tempo[i].clear();

481 482
		unsigned int nb = 0;
		while ((d != map.end()) && (nb < nbth*szbuff) )
483 484 485
		{
			if (good(d) && (!dm.isMarked(d)))
			{
486
				dm.markOrbit<ORBIT>(d);
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		// sync and swap the two vectors
		sync1.wait();
		for (unsigned int i=0; i<nbth; ++i)
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	std::vector<T> res;
	//wait for all theads to be finished and get results
506
	for (unsigned int i = 0; i < nbth; ++i)
507 508 509 510 511 512 513 514 515 516 517 518 519
	{
		threads[i]->join();
		delete threads[i];
		th_funcs[i]->clean();
	}

	//release dynamic allocation
	delete[] threads;
	delete[] vd;
	delete[] th_funcs;
	delete[] tempo;
}

520 521
template <typename PFP, unsigned int CELL, typename T>
void foreach_cell_res(typename PFP::MAP& map, FunctorMapThreadedResult<typename PFP::MAP, T>& func,  unsigned int nbth, unsigned int szbuff, std::vector<T>& results, bool needMarkers, const FunctorSelect& good)
522 523 524 525 526
{
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
	ThreadFunctionResult<typename PFP::MAP,T>**  th_funcs= new ThreadFunctionResult<typename PFP::MAP,T>*[nbth];

527 528
	CellMarker<CELL> cm(map);
	Dart d = map.begin();
529 530

	// nbth new functions, new thread (with good darts !)
531
	for (unsigned int i = 0; i < nbth; ++i)
532 533 534
		vd[i].reserve(szbuff);

	// fill each vd buffers with 4096 darts
535 536
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
	{
		if (good(d) && (!cm.isMarked(d)))
		{
			cm.mark(d);
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

	// prepare some space pour results
	results.resize(nbth);

	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
	bool finished=false;
	// lauch threads
	if (needMarkers)
	{
564
		for (unsigned int i = 0; i < nbth; ++i)
565 566 567 568 569 570 571
		{
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,1+i,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}
	else
	{
572
		for (unsigned int i = 0; i < nbth; ++i)
573 574 575 576 577 578 579 580 581
		{
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,0,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}

	// and continue to traverse the map
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

582
	for (unsigned int i = 0; i < nbth; ++i)
583 584 585 586
		tempo[i].reserve(szbuff);

	while ( d != map.end())
	{
587
		for (unsigned int i = 0; i < nbth; ++i)
588 589
			tempo[i].clear();

590 591
		unsigned int nb = 0;
		while ((d != map.end()) && (nb < nbth*szbuff) )
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
		{
			if (good(d) && (!cm.isMarked(d)))
			{
				cm.mark(d);
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		sync1.wait();
		for (unsigned int i=0; i<nbth; ++i)
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	std::vector<T> res;
	//wait for all theads to be finished and get results
614
	for (unsigned int i = 0; i < nbth; ++i)
615 616 617 618 619 620 621 622 623 624 625 626 627
	{
		threads[i]->join();
		delete threads[i];
		th_funcs[i]->clean();
	}

	delete[] threads;
	delete[] vd;
	delete[] th_funcs;
	delete[] tempo;
}

template <typename PFP, typename T>
628
void foreach_dart_res(typename PFP::MAP& map, FunctorMapThreadedResult<typename PFP::MAP, T>& func, unsigned int nbth, unsigned int szbuff, std::vector<T>& results, bool needMarkers, const FunctorSelect& good)
629 630 631
{
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
632
	ThreadFunctionResult<typename PFP::MAP,T>** th_funcs= new ThreadFunctionResult<typename PFP::MAP,T>*[nbth];
633

634
	Dart d = map.begin();
635 636 637 638 639 640

	// nbth new functions, new thread (with good darts !)
	for (unsigned int i=0; i<nbth; ++i)
		vd[i].reserve(szbuff);

	// fill each vd buffers with szbuff darts
641 642
	unsigned int nb = 0;
	while ((d != map.end()) && (nb < nbth*szbuff) )
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
	{
		if (good(d))
		{
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

	// prepare some space pour results
	results.resize(nbth);

	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
665
	bool finished = false;
666 667 668
	// lauch threads
	if (needMarkers)
	{
669
		for (unsigned int i = 0; i < nbth; ++i)
670 671 672 673 674 675 676
		{
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,1+i,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}
	else
	{
677
		for (unsigned int i = 0; i < nbth; ++i)
678 679 680 681 682 683 684 685 686
		{
			th_funcs[i] = new ThreadFunctionResult<typename PFP::MAP,T>(func, vd[i],sync1,sync2, finished,0,results[i]);
			threads[i] = new boost::thread(*(th_funcs[i]));
		}
	}

	// and continue to traverse the map
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

687
	for (unsigned int i = 0; i < nbth; ++i)
688 689
		tempo[i].reserve(szbuff);

690
	while (d != map.end())
691 692 693 694
	{
		for (unsigned int i=0; i<nbth; ++i)
			tempo[i].clear();

695 696
		unsigned int nb = 0;
		while ((d != map.end()) && (nb < nbth*szbuff) )
697 698 699 700 701 702 703 704 705 706
		{
			if (good(d))
			{
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		sync1.wait();
707
		for (unsigned int i = 0; i < nbth; ++i)
708 709 710 711 712 713 714 715 716 717
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	std::vector<T> res;
	//wait for all theads to be finished and get results
718
	for (unsigned int i = 0; i < nbth; ++i)
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
	{
		threads[i]->join();
		delete threads[i];
		th_funcs[i]->clean();
	}

	delete[] threads;
	delete[] vd;
	delete[] th_funcs;
	delete[] tempo;
}

template <typename T>
T sumResult(const std::vector<T>& res)
{
	T sum(res.front());
735
	typename std::vector<T>::const_iterator it = res.begin();
736 737 738 739 740 741 742 743 744 745 746 747 748 749
	it++;
	while (it != res.end())
	{
		sum += *it;
		++it;
	}
	return sum;
}

template <typename T1, typename T2>
std::pair<T1,T2> sumPairResult(const std::vector< std::pair<T1,T2> >& res)
{
	T1 sum1(res.front().first);
	T2 sum2(res.front().second);
750
	typename std::vector< std::pair<T1,T2> >::const_iterator it = res.begin();
751 752 753 754 755 756 757 758 759 760 761 762 763 764
	it++;
	while (it != res.end())
	{
		sum1 += it->first;
		sum2 += it->second;
		++it;
	}
	return std::pair<T1,T2>(sum1,sum2);
}

template <typename T>
T maxResult(const std::vector<T>& res)
{
	T maxr(res.front());
765
	typename std::vector<T>::const_iterator it = res.begin();
766 767 768 769 770 771 772 773 774 775 776 777 778 779
	it++;
	while (it != res.end())
	{
		if (*it > maxr)
		maxr = *it;
		++it;
	}
	return maxr;
}

template <typename T>
T minResult(const std::vector<T>& res)
{
	T minr(res.front());
780
	typename std::vector<T>::const_iterator it = res.begin();
781 782 783 784 785 786 787 788 789 790
	it++;
	while (it != res.end())
	{
		if (*it < minr)
		minr = *it;
		++it;
	}
	return minr;
}

791
} // namespace Parallel
792

793
} // namespace Algo
794

795
} // namespace CGoGN