coderplay
1/30/2012 - 10:05 AM

DragonVertexTest

DragonVertexTest

  Configuration conf = getConf();                                                                                
  conf.setInt(INT_PROPERTY, 1);                                                                                  
  conf.set(STRING_PROPERTY, "VALUE");                                                                            
  conf.set(GraphJobConfig.PROPERTY, "GRAPHJOB_VALUE");                                                           
  GraphJob job = GraphJob.getInstance(conf);                                                                     
  job.setJobName("First Graph Job");                                                                             
                                                                                                                 
  DragonVertex source = new DragonVertex.Builder("source")                                                       
                                        .producer(EventProducer.class)                                           
                                        .processor(EventProcessor.class)                                         
                                        .tasks(10)                                                               
                                        .build();                                                                
  DragonVertex m1 = new DragonVertex.Builder("intermediate1")                                                    
                                    .processor(EventProcessor.class)                                             
                                    .cache("file.txt,dict.dat")                                                  
                                    .tasks(10)                                                                   
                                    .build();                                                                    
  DragonVertex m2 = new DragonVertex.Builder("intermediate2")                                                    
                                    .processor(EventProcessor.class)                                             
                                    .tasks(10)                                                                   
                                    .build();                                                                    
  DragonVertex dest = new DragonVertex.Builder("source")                                                         
                                        .processor(EventProcessor.class)                                         
                                        .tasks(10)                                                               
                                        .build();                                                                
  DirectedAcyclicGraph<DragonVertex, DragonEdge> g = new DirectedAcyclicGraph<DragonVertex, DragonEdge>();       
  // check if the graph is cycle when adding edge                                                                
  g.addEdge(source, m1).parition(HashPartitioner.class);                                                         
  g.addEdge(source, m2).parition(HashPartitioner.class);                                                         
  g.addEdge(m1, dest).parition(CustomPartitioner.class);                                                         
  g.addEdge(m2, dest).parition(CustomPartitioner.class);                                                         
  job.setGraph(serializer.serialze(g));                                                                          
  // check all source vertex hold event producers when submitting                                                
  job.submit();