parallel_foreach.hpp 17.5 KB
Newer Older
Sylvain Thery's avatar
Sylvain Thery committed
1 2 3
/*******************************************************************************
* CGoGN: Combinatorial and Geometric modeling with Generic N-dimensional Maps  *
* version 0.1                                                                  *
4
* Copyright (C) 2009-2012, IGG Team, LSIIT, University of Strasbourg           *
Sylvain Thery's avatar
Sylvain Thery committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
*                                                                              *
* This library is free software; you can redistribute it and/or modify it      *
* under the terms of the GNU Lesser General Public License as published by the *
* Free Software Foundation; either version 2.1 of the License, or (at your     *
* option) any later version.                                                   *
*                                                                              *
* This library is distributed in the hope that it will be useful, but WITHOUT  *
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or        *
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License  *
* for more details.                                                            *
*                                                                              *
* You should have received a copy of the GNU Lesser General Public License     *
* along with this library; if not, write to the Free Software Foundation,      *
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA.           *
*                                                                              *
20
* Web site: http://cgogn.unistra.fr/                                           *
Sylvain Thery's avatar
Sylvain Thery committed
21 22 23 24 25
* Contact information: cgogn@unistra.fr                                        *
*                                                                              *
*******************************************************************************/
#include <boost/thread.hpp>
#include <boost/thread/barrier.hpp>
sylvain thery's avatar
sylvain thery committed
26
#include <vector>
Sylvain Thery's avatar
Sylvain Thery committed
27 28 29 30 31 32 33 34 35 36

namespace CGoGN
{

namespace Algo
{

namespace Parallel
{

37
class ThreadFunctionAttrib
38 39
{
protected:
40
	std::vector<unsigned int>& m_ids;
41 42 43
	boost::barrier& m_sync1;
	boost::barrier& m_sync2;
	bool& m_finished;
44 45 46
	unsigned int m_id;
	FunctorAttribThreaded* m_functor;

47
public:
48 49
	ThreadFunctionAttrib(FunctorAttribThreaded* func, std::vector<unsigned int>& vid, boost::barrier& s1, boost::barrier& s2, bool& finished, unsigned int id):
		m_ids(vid), m_sync1(s1), m_sync2(s2), m_finished(finished), m_id(id), m_functor(func)
50 51 52
	{
	}

53 54
	ThreadFunctionAttrib(const ThreadFunctionAttrib& tf):
		m_ids(tf.m_ids), m_sync1(tf.m_sync1), m_sync2(tf.m_sync2), m_finished(tf.m_finished), m_id(tf.m_id), m_functor(tf.m_functor){}
55 56 57 58 59

	void operator()()
	{
		while (!m_finished)
		{
60
			for (std::vector<unsigned int>::const_iterator it = m_ids.begin(); it != m_ids.end(); ++it)
61
				m_functor->run(*it,m_id);
62 63 64 65
			m_sync1.wait();
			m_sync2.wait();
		}
	}
66 67
};

68

Sylvain Thery's avatar
Sylvain Thery committed
69 70


71 72 73 74 75
/**
 *
 */
template<typename MAP>
class ThreadFunction
Sylvain Thery's avatar
Sylvain Thery committed
76 77
{
protected:
78
	std::vector<Dart>& m_darts;
Sylvain Thery's avatar
Sylvain Thery committed
79 80 81
	boost::barrier& m_sync1;
	boost::barrier& m_sync2;
	bool& m_finished;
82 83
	unsigned int m_id;
	FunctorMapThreaded<MAP>* m_functor;
Sylvain Thery's avatar
Sylvain Thery committed
84
public:
85 86
	ThreadFunction(FunctorMapThreaded<MAP>* func, std::vector<Dart>& vd, boost::barrier& s1, boost::barrier& s2, bool& finished, unsigned int id):
		m_darts(vd), m_sync1(s1), m_sync2(s2), m_finished(finished), m_id(id), m_functor(func)
87 88 89
	{
	}

90 91
	ThreadFunction(const ThreadFunction<MAP>& tf):
		m_darts(tf.m_darts), m_sync1(tf.m_sync1), m_sync2(tf.m_sync2), m_finished(tf.m_finished), m_id(tf.m_id), m_functor(tf.m_functor){}
Sylvain Thery's avatar
Sylvain Thery committed
92 93 94 95 96 97

	void operator()()
	{
		while (!m_finished)
		{
			for (std::vector<Dart>::const_iterator it = m_darts.begin(); it != m_darts.end(); ++it)
98
				m_functor->run(*it,m_id);
Sylvain Thery's avatar
Sylvain Thery committed
99 100 101 102 103 104
			m_sync1.wait();
			m_sync2.wait();
		}
	}
};

105

106 107 108 109 110 111
inline unsigned int nbThreads()
{
	return boost::thread::hardware_concurrency();
}


112

113
inline unsigned int optimalNbThreads()
Sylvain Thery's avatar
Sylvain Thery committed
114
{
115 116 117
	unsigned int nb = nbThreads();
	if (nb>4)
		return nb/2 ;
Sylvain Thery's avatar
Sylvain Thery committed
118

119 120
	return nb;
}
Sylvain Thery's avatar
Sylvain Thery committed
121 122


123
template <typename MAP, unsigned int ORBIT>
124
void foreach_cell(MAP& map, FunctorMapThreaded<MAP>& func, unsigned int nbth, bool needMarkers, const FunctorSelect& good, unsigned int currentThread)
125 126 127
{
	if (nbth == 0)
		nbth = optimalNbThreads();
Sylvain Thery's avatar
Sylvain Thery committed
128

129 130
	std::vector<FunctorMapThreaded<MAP>*> funcs;
	funcs.reserve(nbth);
131

132 133 134
	FunctorMapThreaded<MAP>* ptr = func.duplicate();
	bool shared = (ptr == NULL);

135
	if (shared)
136
	{
137
		for (unsigned int i = 0; i < nbth; ++i)
138
			funcs.push_back(&func);
139 140 141
	}
	else
	{
142 143
		funcs.push_back(ptr);
		for (unsigned int i = 1; i < nbth; ++i)
144
			funcs.push_back(func.duplicate());
145
	}
Sylvain Thery's avatar
Sylvain Thery committed
146 147


148
	foreach_cell<MAP,ORBIT>(map,funcs,nbth,needMarkers,good,currentThread);
Sylvain Thery's avatar
Sylvain Thery committed
149

150
	if (!shared)
151
		for (unsigned int i = 0; i < nbth; ++i)
152
			delete funcs[i];
Sylvain Thery's avatar
Sylvain Thery committed
153 154
}

155 156
template <typename MAP, unsigned int ORBIT>
void foreach_cell(MAP& map, std::vector<FunctorMapThreaded<MAP>*>& funcs, unsigned int nbth, bool needMarkers, const FunctorSelect& good, unsigned int currentThread)
Sylvain Thery's avatar
Sylvain Thery committed
157
{
158 159
	assert(funcs.size() ==  nbth);

sylvain thery's avatar
sylvain thery committed
160 161
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];
Sylvain Thery's avatar
Sylvain Thery committed
162 163

	// nbth new functions, new thread (with good darts !)
164
	for (unsigned int i = 0; i < nbth; ++i)
165
		vd[i].reserve(SIZE_BUFFER_THREAD);
Sylvain Thery's avatar
Sylvain Thery committed
166

167 168 169 170 171 172 173 174 175 176 177

	AttributeContainer* cont = NULL;
	DartMarker* dmark = NULL;
	CellMarker<ORBIT>* cmark = NULL;
	AttributeMultiVector<Dart>* quickTraversal = map.template getQuickTraversal<ORBIT>() ;

	// fill each vd buffers with SIZE_BUFFER_THREAD darts
	Dart d;
	unsigned int di=0;

	if(quickTraversal != NULL)
Sylvain Thery's avatar
Sylvain Thery committed
178
	{
179 180 181 182 183
		cont = &(map.template getAttributeContainer<ORBIT>()) ;

		di = cont->begin();
		unsigned int nb = 0;
		while ((di != cont->end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
Sylvain Thery's avatar
Sylvain Thery committed
184
		{
185 186 187 188 189 190 191
			d = quickTraversal->operator[](di);
			if (good(d))
			{
				vd[nb%nbth].push_back(d);
				nb++;
			}
			cont->next(di);
Sylvain Thery's avatar
Sylvain Thery committed
192 193
		}
	}
194
	else
195
	{
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
		if(map.template isOrbitEmbedded<ORBIT>())
		{
			cmark = new CellMarker<ORBIT>(map, currentThread) ;

			d = map.begin();
			unsigned int nb = 0;
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
			{
				if (good(d) && (!cmark->isMarked(d)))
				{
					cmark->mark(d);
					vd[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}
		}
		else
		{
			dmark = new DartMarker(map, currentThread) ;
			d = map.begin();
			unsigned int nb = 0;
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
			{
				if (good(d) && (!dmark->isMarked(d)))
				{
					dmark->markOrbit<ORBIT>(d);
					vd[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}
		}
229 230
	}

Sylvain Thery's avatar
Sylvain Thery committed
231 232 233 234
	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
	bool finished=false;
	// lauch threads
235 236
	if (needMarkers)
	{
237 238 239
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
240
	}
241 242 243 244

	for (unsigned int i = 0; i < nbth; ++i)
		threads[i] = new boost::thread(ThreadFunction<MAP>(funcs[i], vd[i],sync1,sync2, finished,1+i));

Sylvain Thery's avatar
Sylvain Thery committed
245
	// and continue to traverse the map
sylvain thery's avatar
sylvain thery committed
246 247
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];

248
	for (unsigned int i = 0; i < nbth; ++i)
249
		tempo[i].reserve(SIZE_BUFFER_THREAD);
Sylvain Thery's avatar
Sylvain Thery committed
250 251


252 253 254
	if (cont)
	{
		while (di != cont->end())
Sylvain Thery's avatar
Sylvain Thery committed
255
		{
256 257 258 259
			for (unsigned int i = 0; i < nbth; ++i)
				tempo[i].clear();
			unsigned int nb = 0;
			while ((di != cont->end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
Sylvain Thery's avatar
Sylvain Thery committed
260
			{
261 262 263 264 265 266 267
				d = quickTraversal->operator[](di);
				if (good(d))
				{
					tempo[nb%nbth].push_back(d);
					nb++;
				}
				cont->next(di);
Sylvain Thery's avatar
Sylvain Thery committed
268
			}
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
			sync1.wait();
			for (unsigned int i = 0; i < nbth; ++i)
				vd[i].swap(tempo[i]);
			sync2.wait();
		}
	}
	else if (cmark)
	{
		while (d != map.end())
		{
			for (unsigned int i = 0; i < nbth; ++i)
				tempo[i].clear();
			unsigned int nb = 0;
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
			{
				if (good(d) && (!cmark->isMarked(d)))
				{
					cmark->mark(d);
					tempo[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}
			sync1.wait();
			for (unsigned int i = 0; i < nbth; ++i)
				vd[i].swap(tempo[i]);
			sync2.wait();
		}
	}
	else
	{
		while (d != map.end())
		{
			for (unsigned int i = 0; i < nbth; ++i)
				tempo[i].clear();
			unsigned int nb = 0;
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
			{
				if (good(d) && (!dmark->isMarked(d)))
				{
					dmark->markOrbit<ORBIT>(d);
					tempo[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}
			sync1.wait();
			for (unsigned int i = 0; i < nbth; ++i)
				vd[i].swap(tempo[i]);
			sync2.wait();
Sylvain Thery's avatar
Sylvain Thery committed
319 320 321 322 323 324 325 326
		}
	}

	sync1.wait();
	finished = true;
	sync2.wait();

	//wait for all theads to be finished
327
	for (unsigned int i = 0; i < nbth; ++i)
Sylvain Thery's avatar
Sylvain Thery committed
328 329 330 331
	{
		threads[i]->join();
		delete threads[i];
	}
sylvain thery's avatar
sylvain thery committed
332 333 334
	delete[] threads;
	delete[] vd;
	delete[] tempo;
Sylvain Thery's avatar
Sylvain Thery committed
335

336 337
	if (cmark != NULL)
		delete cmark;
Sylvain Thery's avatar
Sylvain Thery committed
338

339 340 341
	if (dmark != NULL)
		delete dmark;
}
Sylvain Thery's avatar
Sylvain Thery committed
342 343 344



345
template <typename MAP>
346
void foreach_dart(MAP& map, FunctorMapThreaded<MAP>& func, unsigned int nbth, bool needMarkers, const FunctorSelect& good)
347 348 349
{
	if (nbth == 0)
		nbth = optimalNbThreads();
350

351 352 353
	std::vector<FunctorMapThreaded<MAP>*> funcs;
	funcs.reserve(nbth);

354 355 356
	FunctorMapThreaded<MAP>* ptr = func.duplicate();
	bool shared = (ptr == NULL);

357
	if (shared)
358
	{
359
		for (unsigned int i = 0; i < nbth; ++i)
360
			funcs.push_back(&func);
361 362 363
	}
	else
	{
364 365
		funcs.push_back(ptr);
		for (unsigned int i = 1; i < nbth; ++i)
366
			funcs.push_back(func.duplicate());
367
	}
Sylvain Thery's avatar
Sylvain Thery committed
368

369
	foreach_dart<MAP>(map,funcs,nbth,needMarkers,good);
Sylvain Thery's avatar
Sylvain Thery committed
370

371
	if (!shared)
372
		for (unsigned int i = 0; i < nbth; ++i)
373
			delete funcs[i];
Sylvain Thery's avatar
Sylvain Thery committed
374 375
}

376 377 378

template <typename MAP>
void foreach_dart(MAP& map, std::vector<FunctorMapThreaded<MAP>*> funcs, unsigned int nbth, bool needMarkers, const FunctorSelect& good)
379
{
380 381
	assert(funcs.size() ==  nbth);

382 383 384
	std::vector<Dart>* vd = new std::vector<Dart>[nbth];
	boost::thread** threads = new boost::thread*[nbth];

385
	Dart d = map.begin();
386 387

	// nbth new functions, new thread (with good darts !)
388
	for (unsigned int i = 0; i < nbth; ++i)
389
		vd[i].reserve(SIZE_BUFFER_THREAD);
390 391

	// fill each vd buffers with 4096 darts
392
	unsigned int nb = 0;
393
	while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
394
	{
395
		if (good(d))
396 397 398 399 400 401 402
		{
			vd[nb%nbth].push_back(d);
			nb++;
		}
		map.next(d);
	}

403 404 405 406
	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
	bool finished = false;
	// lauch threads
407 408 409 410 411 412 413
	if (needMarkers)
	{
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

414
	for (unsigned int i = 0; i < nbth; ++i)
415
	{
416 417
//		funcs[i]->setThreadID(1+i);
		threads[i] = new boost::thread(ThreadFunction<MAP>(funcs[i], vd[i],sync1,sync2, finished,1+i));
418 419 420 421
	}

	// and continue to traverse the map
	std::vector<Dart>* tempo = new std::vector<Dart>[nbth];
422
	for (unsigned int i = 0; i < nbth; ++i)
423
		tempo[i].reserve(SIZE_BUFFER_THREAD);
424

425
	while (d != map.end())
426
	{
427
		for (unsigned int i = 0; i < nbth; ++i)
428 429
			tempo[i].clear();

430 431
		unsigned int nb =0;
		while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
432
		{
433
			if (good(d))
434 435 436 437 438 439 440 441
			{
				tempo[nb%nbth].push_back(d);
				nb++;
			}
			map.next(d);
		}

		sync1.wait();
442
		for (unsigned int i = 0; i < nbth; ++i)
443 444 445 446 447 448 449 450
			vd[i].swap(tempo[i]);
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

451
	//wait for all theads to be finished
452
	for (unsigned int i = 0; i < nbth; ++i)
453 454 455 456
	{
		threads[i]->join();
		delete threads[i];
	}
457 458 459 460
	
	delete vd;
	delete threads;
	delete tempo;
461 462 463 464
}



465
inline void foreach_attrib(AttributeContainer& attr_cont, FunctorAttribThreaded& func, unsigned int nbth)
466 467 468
{
	if (nbth == 0)
		nbth = optimalNbThreads();
469

470 471
	std::vector<FunctorAttribThreaded*> funcs;
	funcs.reserve(nbth);
472

473 474 475
	FunctorAttribThreaded* ptr = func.duplicate();
	bool shared = (ptr == NULL);

476
	if (shared)
477
	{
478
		for (unsigned int i = 0; i < nbth; ++i)
479
			funcs.push_back(&func);
480 481 482
	}
	else
	{
483 484
		funcs.push_back(ptr);
		for (unsigned int i = 1; i < nbth; ++i)
485
			funcs.push_back(func.duplicate());
486 487
	}

488
	foreach_attrib(attr_cont,funcs,nbth);
489

490
	if (!shared)
491
		for (unsigned int i = 0; i < nbth; ++i)
492
			delete funcs[i];
493 494 495

}

496 497

inline void foreach_attrib(AttributeContainer& attr_cont, std::vector<FunctorAttribThreaded*> funcs, unsigned int nbth)
498
{
499
	assert(funcs.size() ==  nbth);
500

501 502
	std::vector<unsigned int >* vid = new std::vector<unsigned int>[2*nbth];
	boost::thread** threads = new boost::thread*[nbth];
503

504 505
	for (unsigned int i = 0; i < 2*nbth; ++i)
		vid[i].reserve(SIZE_BUFFER_THREAD);
506

507 508
	// fill each vid buffers with 4096 id
	unsigned int id = attr_cont.begin();
509
	unsigned int nb = 0;
510 511
	unsigned int nbm = nbth*SIZE_BUFFER_THREAD;
	while ((id != attr_cont.end()) && (nb < nbm))
512
	{
513 514 515
		vid[nb%nbth].push_back(id);
		nb++;
		attr_cont.next(id);
516 517 518 519 520
	}


	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
521
	bool finished=false;
522
	// lauch threads
523
	for (unsigned int i = 0; i < nbth; ++i)
524
		threads[i] = new boost::thread(ThreadFunctionAttrib(funcs[i], vid[i],sync1,sync2, finished,1+i));
525

526
	while (id != attr_cont.end())
527
	{
528 529
		for (unsigned int i = nbth; i < 2*nbth; ++i)
			vid[i].clear();
530

531
		unsigned int nb = 0;
532
		while ((id != attr_cont.end()) && (nb < nbm))
533
		{
534 535 536
			vid[nbth + nb%nbth].push_back(id);
			nb++;
			attr_cont.next(id);
537 538 539
		}

		sync1.wait();
540
		for (unsigned int i = 0; i < nbth; ++i)
541
			vid[i].swap(vid[nbth+i]);
542 543 544 545 546 547 548
		sync2.wait();
	}

	sync1.wait();
	finished = true;
	sync2.wait();

549
	//wait for all theads to be finished
550
	for (unsigned int i = 0; i < nbth; ++i)
551 552 553 554 555
	{
		threads[i]->join();
		delete threads[i];
	}
	delete[] threads;
556
	delete[] vid;
557 558 559 560
}



561
// TODO same modification for transparent usage of dart marker / cell marker / quick traversal
562

563 564
template <typename MAP, unsigned int CELL>
void foreach_cell2Pass(MAP& map, std::vector<FunctorMapThreaded<MAP>*>& funcsFrontnBack, unsigned int nbLoops, unsigned int nbth, bool needMarkers, const FunctorSelect& good)
565
{
566
	std::vector<Dart>* vd = new std::vector<Dart>[2*nbth];
567
	for (unsigned int i = 0; i < nbth; ++i)
568
		vd[i].reserve(SIZE_BUFFER_THREAD);
569

570
	boost::thread** threadsAB = new boost::thread*[2*nbth];
571 572 573 574 575 576 577 578 579

	if (needMarkers)
	{
		// ensure that there is enough threads
		unsigned int nbth_prec = map.getNbThreadMarkers();
		if (nbth_prec < nbth+1)
			map.addThreadMarker(nbth+1-nbth_prec);
	}

580 581 582 583
	CellMarkerNoUnmark<CELL> cm(map); // for 2 pass front mark / back unmark

	boost::barrier sync1(nbth+1);
	boost::barrier sync2(nbth+1);
584 585 586 587 588 589

	for (unsigned int loop=0; loop< nbLoops; ++loop)
	{
		// PASS FRONT (A)
		{
			Dart d = map.begin();
590
			// fill each vd buffers with SIZE_BUFFER_THREAD darts
591
			unsigned int nb = 0;
592
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
593 594 595 596 597 598 599 600 601 602 603
			{
				if (good(d) && (!cm.isMarked(d)))
				{
					cm.mark(d);
					vd[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}

			bool finished=false;
604 605 606 607 608
			// lauch threads funcsFrontnBack

			for (unsigned int i = 0; i < nbth; ++i)
				threadsAB[i] = new boost::thread(ThreadFunction<MAP>(funcsFrontnBack[i], vd[i], sync1, sync2, finished,1+i));

609 610 611 612 613
			// and continue to traverse the map

			while (d != map.end())
			{
				for (unsigned int i = 0; i < nbth; ++i)
614
					vd[nbth+i].clear();
615 616

				unsigned int nb = 0;
617
				while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
618 619 620 621
				{
					if (good(d) && (!cm.isMarked(d)))
					{
						cm.mark(d);
622
						vd[nbth+nb%nbth].push_back(d);
623 624 625 626 627 628 629
						nb++;
					}
					map.next(d);
				}

				sync1.wait();
				for (unsigned int i = 0; i < nbth; ++i)
630
					vd[i].swap(vd[nbth+i]);
631 632 633 634 635 636 637 638 639
				sync2.wait();
			}

			sync1.wait();
			finished = true;
			sync2.wait();

			//wait for all theads to be finished
			for (unsigned int i = 0; i < nbth; ++i)
640
				threadsAB[i]->join();
641 642 643 644 645 646
		}
		// PASS BACK (B)
		{
			for (unsigned int i = 0; i < nbth; ++i)
				vd[i].clear();
			Dart d = map.begin();
647
			// fill each vd buffers with SIZE_BUFFER_THREAD darts
648
			unsigned int nb = 0;
649
			while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
650 651 652 653 654 655 656 657 658 659 660
			{
				if (good(d) && (cm.isMarked(d)))
				{
					cm.unmark(d);
					vd[nb%nbth].push_back(d);
					nb++;
				}
				map.next(d);
			}

			bool finished=false;
661 662 663
			for (unsigned int i = 0; i < nbth; ++i)
				threadsAB[nbth+i] = new boost::thread(ThreadFunction<MAP>(funcsFrontnBack[nbth+i], vd[i],sync1,sync2, finished,1+i));

664 665 666 667 668
			// and continue to traverse the map

			while (d != map.end())
			{
				for (unsigned int i = 0; i < nbth; ++i)
669
					vd[nbth+i].clear();
670 671

				unsigned int nb = 0;
672
				while ((d != map.end()) && (nb < nbth*SIZE_BUFFER_THREAD) )
673 674 675 676
				{
					if (good(d) && (cm.isMarked(d)))
					{
						cm.unmark(d);
677
						vd[nbth+nb%nbth].push_back(d);
678 679 680 681 682 683 684
						nb++;
					}
					map.next(d);
				}

				sync1.wait();
				for (unsigned int i = 0; i < nbth; ++i)
685
					vd[i].swap(vd[nbth+i]);
686 687 688 689 690 691 692 693 694
				sync2.wait();
			}

			sync1.wait();
			finished = true;
			sync2.wait();

			//wait for all theads to be finished
			for (unsigned int i = 0; i < nbth; ++i)
695
				threadsAB[nbth+i]->join();
696 697 698 699
		}
	}

	// free buffers and threads
700
	for (unsigned int i = 0; i < 2*nbth; ++i)
701
	{
702
		delete threadsAB[i];
703
	}
704
	delete[] threadsAB;
705 706 707 708 709
	delete[] vd;
}



710
template <typename MAP, unsigned int CELL>
711
void foreach_cell2Pass(MAP& map, FunctorMapThreaded<MAP>& funcFront, FunctorMapThreaded<MAP>& funcBack, unsigned int nbLoops, unsigned int nbth, bool needMarkers, const FunctorSelect& good)
712 713 714
{
	if (nbth == 0)
		nbth = optimalNbThreads();
715

716 717
	std::vector<FunctorMapThreaded<MAP>*> funcs;
	funcs.reserve(nbth);
718

719 720 721
	FunctorMapThreaded<MAP>* ptr = funcFront.duplicate();
	bool shared = (ptr == NULL);

722 723 724 725 726 727 728 729 730
	if (shared)
	{
		for (unsigned int i = 0; i < nbth; ++i)
			funcs.push_back(&funcFront);
		for (unsigned int i = 0; i < nbth; ++i)
			funcs.push_back(&funcBack);
	}
	else
	{
731 732
		funcs.push_back(ptr);
		for (unsigned int i = 1; i < nbth; ++i)
733 734 735
			funcs.push_back(funcFront.duplicate());
		for (unsigned int i = 0; i < nbth; ++i)
			funcs.push_back(funcBack.duplicate());
736

737
	}
738

739
	foreach_cell2Pass<MAP,CELL>(map,funcs,nbLoops,nbth,needMarkers,good);
740

741
	if (!shared)
742
		for (unsigned int i = 0; i < 2*nbth; ++i)
743 744
			delete funcs[i];
}
745 746


747
} // namespace Parallel
748

749
} // namespace Algo
750

751
} // namespace CGoGN