当前位置:  编程技术>java/j2ee

java多线程处理执行solr创建索引示例

    来源: 互联网  发布时间:2014-11-03

    本文导语:  代码如下:public class SolrIndexer implements Indexer, Searcher, DisposableBean { //~ Static fields/initializers =============================================  static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class); private static final long SHUTDOWN_TIMEOUT    = 5 * 60...

代码如下:

public class SolrIndexer implements Indexer, Searcher, DisposableBean {
 //~ Static fields/initializers =============================================

 static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

 private static final long SHUTDOWN_TIMEOUT    = 5 * 60 * 1000L; // long enough

 private static final int  INPUT_QUEUE_LENGTH  = 16384;

 //~ Instance fields ========================================================

 private CommonsHttpSolrServer server;

 private BlockingQueue inputQueue;

 private Thread updateThread;
 volatile boolean running = true;
 volatile boolean shuttingDown = false;

 //~ Constructors ===========================================================

 public SolrIndexer(String url) throws MalformedURLException {
  server = new CommonsHttpSolrServer(url);

  inputQueue = new ArrayBlockingQueue(INPUT_QUEUE_LENGTH);

  updateThread = new Thread(new UpdateTask());
  updateThread.setName("SolrIndexer");
  updateThread.start();
 }

 //~ Methods ================================================================

 public void setSoTimeout(int timeout) {
  server.setSoTimeout(timeout);
 }

 public void setConnectionTimeout(int timeout) {
  server.setConnectionTimeout(timeout);
 }

 public void setAllowCompression(boolean allowCompression) {
  server.setAllowCompression(allowCompression);
 }


 public void addIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
 }
 

 public void delIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.DELETE));
 }

 
 private void updateIndices(String type, List indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("Updating {} indices", indices.size());

  UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

  for (Indexable idx : indices) {
   Doc doc = idx.getDoc();

   SolrInputDocument solrDoc = new SolrInputDocument();
   solrDoc.setDocumentBoost(doc.getDocumentBoost());
   for (Iterator i = doc.iterator(); i.hasNext();) {
    Field field = i.next();
    solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
   }

   req.add(solrDoc);   
  }

  try {
   req.process(server);   
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

 
 private void delIndices(String type, List indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("Deleting {} indices", indices.size());

  UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
  for (Indexable indexable : indices) {   
   req.deleteById(indexable.getDocId());
  }

  try {
   req.process(server);
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

 
 public QueryResult search(Query query) throws IndexingException {
  SolrQuery sq = new SolrQuery();
  sq.setQuery(query.getQuery());
  if (query.getFilter() != null) {
   sq.addFilterQuery(query.getFilter());
  }
  if (query.getOrderField() != null) {
   sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
  }
  sq.setStart(query.getOffset());
  sq.setRows(query.getLimit());

  QueryRequest req = new QueryRequest(sq);
  req.setPath("/" + query.getType() + "/select");

  try {
   QueryResponse rsp = req.process(server);
   SolrDocumentList docs = rsp.getResults();

   QueryResult result = new QueryResult();
   result.setOffset(docs.getStart());
   result.setTotal(docs.getNumFound());
   result.setSize(sq.getRows());

   List resultDocs = new ArrayList(result.getSize());
   for (Iterator i = docs.iterator(); i.hasNext();) {
    SolrDocument solrDocument = i.next();

    Doc doc = new Doc();
    for (Iterator iter = solrDocument.iterator(); iter.hasNext();) {
     Map.Entry field = iter.next();
     doc.addField(field.getKey(), field.getValue());
    }

    resultDocs.add(doc);
   }

   result.setDocs(resultDocs);
   return result;

  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  }
 }
 

 public void destroy() throws Exception {
  shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);  
 }

 public boolean shutdown(long timeout, TimeUnit unit) {
  if (shuttingDown) {
   logger.info("Suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingDown = true;
  String baseName = updateThread.getName();
  updateThread.setName(baseName + " - SHUTTING DOWN");
  boolean rv = false;
  try {
   // Conditionally wait
   if (timeout > 0) {
    updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");
    rv = waitForQueue(timeout, unit);
   }
  } finally {
   // But always begin the shutdown sequence
   running = false;
   updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");
  }
  return rv;
 }

 /**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitForQueue(long timeout, TimeUnit unit) {
  CountDownLatch latch = new CountDownLatch(1);
  inputQueue.add(new StopOperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (InterruptedException e) {
   throw new RuntimeException("Interrupted waiting for queues", e);
  }
 }

 

 class UpdateTask implements Runnable {
  public void run() {
   while (running) {
    try {
     syncIndices();
    } catch (Throwable e) {
     if (shuttingDown) {
      logger.warn("Exception occurred during shutdown", e);
     } else {
      logger.error("Problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("Shut down SolrIndexer");
  }
 }

 void syncIndices() throws InterruptedException {
  Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

  if (op == null) {
   return;
  }

  if (op instanceof StopOperation) {
   ((StopOperation) op).stop();
   return;
  }

  // wait 1 second
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {

  }

  List ops = new ArrayList(inputQueue.size() + 1);
  ops.add(op);
  inputQueue.drainTo(ops);

  Map deleteMap = new HashMap(4);
  Map updateMap = new HashMap(4);

  for (Operation o : ops) {
   if (o instanceof StopOperation) {
    ((StopOperation) o).stop();
   } else {
    Indexable indexable = o.indexable;
    if (o.type == OperationType.DELETE) {
     List docs = deleteMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList();
      deleteMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    } else {
     List docs = updateMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList();
      updateMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    }
   }
  }

  for (Iterator i = deleteMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry entry = i.next();
   delIndices(entry.getKey(), entry.getValue());
  }

  for (Iterator i = updateMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry entry = i.next();
   updateIndices(entry.getKey(), entry.getValue());
  }
 }

 enum OperationType { DELETE, UPDATE, SHUTDOWN }

 static class Operation {
  OperationType type;
  Indexable indexable;

  Operation() {}

  Operation(Indexable indexable, OperationType type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

 static class StopOperation extends Operation {
  CountDownLatch latch;

  StopOperation(CountDownLatch latch) {
   this.latch = latch;
   this.type = OperationType.SHUTDOWN;
  }

  public void stop() {
   latch.countDown();
  }
 }

 //~ Accessors ===============

}


    
 
 

您可能感兴趣的文章:

  • andriod下java socket网络编程:java socket客户端服务端代码示例
  • 输出java进程的jstack信息示例分享 通过线程堆栈信息分析java线程
  • java Servlet实现Session创建存取以及url重写代码示例
  • java 四舍五入使java保留2位小数示例讲解
  • java进行error捕获和处理示例(java异常捕获)
  • java去除集合中重复元素示例分享 java去除重复
  • java读取csv文件示例分享(java解析csv文件)
  • java求三个数的最大值的示例分享
  • java生成字母数字组合的随机数示例 java生成随机数
  • java实现网页解析示例
  • java协变返回类型使用示例
  • 使用java执行定时任务示例
  • java自定义枚举转换器示例
  • java向文件末尾添加内容示例分享
  • java正则表达式获取url的host示例
  • java使用正则表达校验手机号码示例(手机号码正则)
  • java实现jframe透明窗体示例
  • java的split方法使用示例
  • java抓取网页数据示例
  • Oracle 使用Java Source 简单示例
  • java自定义日期转化类示例
  • Java中多线程相关类Thread介绍
  • java 线程,对当前线程(非主线程)调用sleep,为什么主线程(窗口)也没反应了
  • java基本教程之多线程基本概念 java多线程教程
  • 请问在java多线程中,是只有run(){}内的代码运行在一个新线程下呢?还是这个类中的代码都运行在一个新线程下?
  • 用什么方法可以查看在windows下jvm下运行当前java程序的线程数和线程名称?
  • java多线程编程之捕获子线程异常示例
  • java线程怎么调用java的application.class?
  • java多线程编程之使用runnable接口创建线程
  • java线程中如何降低CPU的占用率?我这几个线程会不会死锁?
  • java线程之使用Runnable接口创建线程的方法
  •  
    本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
    本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • 高分求助,哪有好的java类库大全,最好带索引,中文
  • Java的索引-搜索引擎 IndexTank
  • java命名空间java.sql类types的类成员方法: java_object定义及介绍
  • 我想学JAVA ,是买THINK IN JAVA 还是JAVA2核心技术:卷1 好???
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: imageflavor定义及介绍
  • 请问Java高手,Java的优势在那里??,Java主要适合于开发哪类应用程序
  • java命名空间java.lang.management类managementfactory的类成员方法: getcompilationmxbean定义及介绍
  • 如何将java.util.Date转化为java.sql.Date?数据库中Date类型对应于java的哪个Date呢
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getlibrarypath定义及介绍
  • 谁有电子版的《Java编程思想第二版(Thinking in java second)》和《Java2编程详解(special edition java2)》?得到给分
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getstarttime定义及介绍
  • 本人想学java,请问java程序员的待遇如何,和java主要有几个比较强的方向
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: stringflavor定义及介绍
  • 我对JAVA一窍不通,可惜别人却给我一个Java的project,要我做一个安装程序,请问哪里有JAVA INSTALLER下载,而且我要不要安装java的sdk才能完成此项任务?
  • java命名空间java.security类keystore的类成员方法: getdefaulttype定义及介绍
  • 新年第一天,让我们讨论一下未来一年JAVA的发展趋势! 个人认为,JAVA将主要朝ERP和JAVA手机方面发展!
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getclasspath定义及介绍
  • 我想学Java,但不知道Java的实用的开发工具有那些,Java主要用在哪些方面,EJB到底是什么东西??
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: javaserializedobjectmimetype定义及介绍
  • redhat7.3下,java程序打印中文直接用java命令执行正常,用crontab执行java命令为乱码
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: javafilelistflavor定义及介绍
  • 各位学java的朋友,学java的未来是什么,你们学java都用来开发什么项目啊!来者给分!!
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getvmname定义及介绍
  • 请问java程序中的import为什么有的用java.….*,而有的又用java.….…,有什么区别吗?


  • 站内导航:


    特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3